优化心跳处理逻辑,可接入设备数量更多

This commit is contained in:
lin 2026-01-14 17:17:44 +08:00
parent 7a4d5e551d
commit 4b0cdd5718
7 changed files with 85 additions and 76 deletions

View File

@ -460,6 +460,16 @@ public interface DeviceMapper {
"</script>"})
void batchUpdate(List<Device> devices);
@Update({"<script>" +
"<foreach collection='devices' item='item' separator=';'>" +
" UPDATE" +
" wvp_device" +
" SET keepalive_time=#{item.keepaliveTime}" +
" WHERE device_id=#{item.deviceId}"+
"</foreach>" +
"</script>"})
void batchUpdateForKeepalive(List<Device> devices);
@Select(value = {" <script>" +
"SELECT " +

View File

@ -121,6 +121,9 @@ public interface IDeviceService {
@Transactional
void updateDeviceList(List<Device> deviceList);
@Transactional
void updateDeviceListForKeepalive(List<Device> deviceList);
/**
* 检查设备编号是否已经存在
* @param deviceId 设备编号

View File

@ -306,7 +306,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
device.setHeartBeatCount(3);
device.setHeartBeatInterval(60);
device.setPositionCapability(0);
}
if (sipTransactionInfo != null) {
device.setSipTransactionInfo(sipTransactionInfo);
@ -780,6 +779,30 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
@Transactional
@Override
public void updateDeviceListForKeepalive(List<Device> deviceList) {
if (deviceList.isEmpty()){
log.info("[批量更新设备] 列表为空,更细失败");
return;
}
int limitCount = 300;
if (deviceList.size() > limitCount) {
for (int i = 0; i < deviceList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > deviceList.size()) {
toIndex = deviceList.size();
}
deviceMapper.batchUpdateForKeepalive(deviceList.subList(i, toIndex));
}
}else {
deviceMapper.batchUpdateForKeepalive(deviceList);
}
for (Device device : deviceList) {
redisCatchStorage.updateDevice(device);
}
}
@Override
public boolean isExist(String deviceId) {
return getDeviceByDeviceIdFromDb(deviceId) != null;

View File

@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@ -69,6 +70,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
@Scheduled(fixedDelay = 400) //每400毫秒执行一次
@Async
public void executeTaskQueue(){
if (taskQueue.isEmpty()) {
return;

View File

@ -17,6 +17,7 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@ -61,6 +62,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
}
@Scheduled(fixedDelay = 200) //每200毫秒执行一次
@Async
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;

View File

@ -154,7 +154,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 注册失败
response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
response.setReasonPhrase("wrong password");
log.info(title + " 设备:{}, 密码/SIP服务器ID错误, 回复403: {}", deviceId, requestAddress);
log.info("{} 设备:{}, 密码/SIP服务器ID错误, 回复403: {}", title, deviceId, requestAddress);
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
return;
}
@ -163,7 +163,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
response = getMessageFactory().createResponse(Response.OK, request);
// 如果主动禁用了Date头则不添加
if (!userSetting.isDisableDateHeader()) {
// 添加date头
// 添加 date头
SIPDateHeader dateHeader = new SIPDateHeader();
// 使用自己修改的
GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
@ -176,9 +176,9 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
return;
}
// 添加Contact头
// 添加 Contact头
response.addHeader(request.getHeader(ContactHeader.NAME));
// 添加Expires头
// 添加 Expires头
response.addHeader(request.getExpires());
if (device == null) {

View File

@ -1,11 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.genersoft.iot.vmp.common.RemoteAddressInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SipMsgInfo;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@ -19,6 +17,7 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -27,9 +26,8 @@ import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 状态信息(心跳)报送
@ -41,7 +39,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
private final static String cmdType = "Keepalive";
private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
private final BlockingQueue<Device> taskQueue = new LinkedBlockingQueue<>();
@Autowired
private NotifyMessageHandler notifyMessageHandler;
@ -55,9 +53,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
@Autowired
private UserSetting userSetting;
@Autowired
private DynamicTask dynamicTask;
@Override
public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this);
@ -65,70 +60,44 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[心跳] 待处理消息队列已满 {}返回486 BUSY_HERE消息不做处理", userSetting.getMaxNotifyCountQueue());
return;
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 心跳回复: {}", e.getMessage());
}
SIPRequest request = (SIPRequest) evt.getRequest();
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (device.getIp() == null || !device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}", device.getName(), device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort(), request.getLocalAddress().getHostAddress());
device.setPort(remoteAddressInfo.getPort());
device.setHostAddress(IpPortUtil.concatenateIpAndPort(remoteAddressInfo.getIp(), String.valueOf(remoteAddressInfo.getPort())));
device.setIp(remoteAddressInfo.getIp());
device.setLocalIp(request.getLocalAddress().getHostAddress());
}
device.setKeepaliveTime(DateUtil.getNow());
if (device.isOnLine()) {
taskQueue.add(device);
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (statusTaskRunner.containsKey(device.getDeviceId())) {
statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
} else {
if (userSetting.getGbDeviceOnline() == 1) {
// 对于已经离线的设备判断他的注册是否已经过期
deviceService.online(device, null);
}
}
taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
}
@Scheduled(fixedDelay = 100)
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
}
List<SipMsgInfo> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
SipMsgInfo poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<Device> deviceListForUpdate = new ArrayList<>();
for (SipMsgInfo sipMsgInfo : handlerCatchDataList) {
if (sipMsgInfo == null) {
continue;
}
RequestEvent evt = sipMsgInfo.getEvt();
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 心跳回复: {}", e.getMessage());
}
Device device = sipMsgInfo.getDevice();
SIPRequest request = (SIPRequest) evt.getRequest();
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (device.getIp() == null || !device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}", device.getName(), device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort(), request.getLocalAddress().getHostAddress());
device.setPort(remoteAddressInfo.getPort());
device.setHostAddress(IpPortUtil.concatenateIpAndPort(remoteAddressInfo.getIp(), String.valueOf(remoteAddressInfo.getPort())));
device.setIp(remoteAddressInfo.getIp());
device.setLocalIp(request.getLocalAddress().getHostAddress());
}
device.setKeepaliveTime(DateUtil.getNow());
if (device.isOnLine()) {
deviceListForUpdate.add(device);
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (statusTaskRunner.containsKey(device.getDeviceId())) {
statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
} else {
if (userSetting.getGbDeviceOnline() == 1) {
// 对于已经离线的设备判断他的注册是否已经过期
deviceService.online(device, null);
}
}
}
if (!deviceListForUpdate.isEmpty()) {
deviceService.updateDeviceList(deviceListForUpdate);
@Scheduled(fixedDelay = 1000)
@Async
public void executeUpdateDeviceList() {
if (!taskQueue.isEmpty()) {
deviceService.updateDeviceListForKeepalive(taskQueue.stream().toList());
taskQueue.clear();
}
}