去除心跳信息入库,提升性能

This commit is contained in:
lin 2026-01-25 21:59:42 +08:00
parent 2c774ae155
commit 31549bce09
12 changed files with 57 additions and 76 deletions

View File

@ -19,6 +19,8 @@ public class VideoManagerConstants {
public static final String ONLINE_MEDIA_SERVERS_PREFIX = "VMP_ONLINE_MEDIA_SERVERS:";
public static final String DEVICE_PREFIX = "VMP_DEVICE_INFO";
public static final String DEVICE_KEEPALIVE_PREFIX = "DEVICE_KEEPALIVE:";
public static final String DEVICE_REGISTER_PREFIX = "DEVICE_REGISTER:";
public static final String INVITE_PREFIX = "VMP_GB_INVITE_INFO";

View File

@ -30,8 +30,6 @@ public interface DeviceMapper {
"port," +
"host_address," +
"expires," +
"register_time," +
"keepalive_time," +
"create_time," +
"update_time," +
"charset," +
@ -65,8 +63,6 @@ public interface DeviceMapper {
"port," +
"host_address," +
"expires," +
"register_time," +
"keepalive_time," +
"heart_beat_interval," +
"heart_beat_count," +
"position_capability," +
@ -98,8 +94,6 @@ public interface DeviceMapper {
"#{port}," +
"#{hostAddress}," +
"#{expires}," +
"#{registerTime}," +
"#{keepaliveTime}," +
"#{heartBeatInterval}," +
"#{heartBeatCount}," +
"#{positionCapability}," +
@ -133,8 +127,6 @@ public interface DeviceMapper {
", port=#{port}" +
", host_address=#{hostAddress}" +
", on_line=#{onLine}" +
", register_time=#{registerTime}" +
", keepalive_time=#{keepaliveTime}" +
", heart_beat_interval=#{heartBeatInterval}" +
", position_capability=#{positionCapability}" +
", heart_beat_count=#{heartBeatCount}" +
@ -166,8 +158,6 @@ public interface DeviceMapper {
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
@ -208,8 +198,6 @@ public interface DeviceMapper {
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
@ -242,8 +230,6 @@ public interface DeviceMapper {
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
@ -277,8 +263,6 @@ public interface DeviceMapper {
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
@ -356,8 +340,6 @@ public interface DeviceMapper {
",transport" +
",stream_mode" +
",on_line" +
",register_time" +
",keepalive_time" +
",ip" +
",create_time" +
",update_time" +
@ -444,8 +426,6 @@ public interface DeviceMapper {
", port=#{item.port}" +
", host_address=#{item.hostAddress}" +
", on_line=#{item.onLine}" +
", register_time=#{item.registerTime}" +
", keepalive_time=#{item.keepaliveTime}" +
", heart_beat_interval=#{item.heartBeatInterval}" +
", position_capability=#{item.positionCapability}" +
", heart_beat_count=#{item.heartBeatCount}" +
@ -460,17 +440,6 @@ public interface DeviceMapper {
"</script>"})
void batchUpdate(List<Device> devices);
@Update({"<script>" +
"<foreach collection='devices' item='item' separator=';'>" +
" UPDATE" +
" wvp_device" +
" SET keepalive_time=#{item.keepaliveTime}" +
" WHERE id=#{item.id}"+
"</foreach>" +
"</script>"})
void batchUpdateForKeepalive(List<Device> devices);
@Select(value = {" <script>" +
"SELECT " +
"coalesce(custom_name, name) as name, " +
@ -482,8 +451,6 @@ public interface DeviceMapper {
",transport" +
",stream_mode" +
",on_line" +
",register_time" +
",keepalive_time" +
",ip" +
",create_time" +
",update_time" +

View File

@ -120,9 +120,6 @@ public interface IDeviceService {
@Transactional
void updateDeviceList(List<Device> deviceList);
@Transactional
void updateDeviceListForKeepalive(List<Device> deviceList);
/**
* 检查设备编号是否已经存在
* @param deviceId 设备编号

View File

@ -295,7 +295,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
inviteStreamService.clearInviteInfo(device.getDeviceId());
}
device.setUpdateTime(now);
device.setKeepaliveTime(now);
if (device.getHeartBeatCount() == null) {
// 读取设备配置 获取心跳间隔和心跳超时次数 在次之前暂时设置为默认值
device.setHeartBeatCount(3);
@ -375,8 +374,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
return;
}
String deviceId = device.getDeviceId();
log.info("[设备离线] device{} 心跳间隔: {},心跳超时次数: {} 上次心跳时间:{} 上次注册时间: {}", deviceId,
device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime());
log.info("[设备离线] device{} 心跳间隔: {},心跳超时次数: {}", deviceId, device.getHeartBeatInterval(), device.getHeartBeatCount());
device.setOnLine(false);
cleanOfflineDevice(device);
redisCatchStorage.updateDevice(device);
@ -400,8 +398,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (device == null) {
continue;
}
log.info("[设备离线] device{} 心跳间隔: {},心跳超时次数: {} 上次心跳时间:{} 上次注册时间: {}", device.getDeviceId(),
device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime());
log.info("[设备离线] device{} 心跳间隔: {},心跳超时次数: {}", device.getDeviceId(), device.getHeartBeatInterval(), device.getHeartBeatCount());
device.setOnLine(false);
cleanOfflineDevice(device);
if (isDevice(device.getDeviceId())) {
@ -756,30 +753,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
@Transactional
@Override
public void updateDeviceListForKeepalive(List<Device> deviceList) {
if (deviceList.isEmpty()){
log.info("[批量更新设备] 列表为空,更细失败");
return;
}
int limitCount = 300;
if (deviceList.size() > limitCount) {
for (int i = 0; i < deviceList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > deviceList.size()) {
toIndex = deviceList.size();
}
deviceMapper.batchUpdateForKeepalive(deviceList.subList(i, toIndex));
}
}else {
deviceMapper.batchUpdateForKeepalive(deviceList);
}
for (Device device : deviceList) {
redisCatchStorage.updateDevice(device);
}
}
@Override
public boolean isExist(String deviceId) {
return getDeviceByDeviceIdFromDb(deviceId) != null;

View File

@ -79,7 +79,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
device.setLocalIp(request.getLocalAddress().getHostAddress());
}
device.setKeepaliveTime(DateUtil.getNow());
if (device.isOnLine()) {
taskQueue.add(device);
@ -92,10 +91,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}
}
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
public void executeUpdateDeviceList() {
if (!taskQueue.isEmpty()) {
deviceService.updateDeviceListForKeepalive(taskQueue.stream().toList());
taskQueue.clear();
}
}

View File

@ -184,5 +184,8 @@ public interface IRedisCatchStorage {
String chooseOneServer(String serverId);
void updateDeviceKeepaliveTime(List<Device> deviceList);
void updateDeviceRegisterTime(List<Device> deviceList);
}

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.genersoft.iot.vmp.common.ServerInfo;
import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
@ -22,6 +23,7 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@ -542,4 +544,27 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
return (String) range.iterator().next();
}
@Override
public void updateDeviceKeepaliveTime(List<Device> deviceList) {
// if (deviceList == null || deviceList.isEmpty()) {
// return;
// }
// // 使用 SessionCallback 保证批量操作在同一个连接中执行
// SessionCallback<Boolean> sessionCallback = session -> {
// // 1. 批量添加心跳数据到列表尾部
// for (Device device : deviceList) {
// session.opsForList().rightPush(VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + device.getDeviceId(), device);
// }
// // 2. 截取列表只保留最新 100
// session.opsForList().trim(VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX, -1000, -1);
// return true;
// };
// redisTemplate.execute(sessionCallback);
}
@Override
public void updateDeviceRegisterTime(List<Device> deviceList) {
}
}

View File

@ -106,8 +106,6 @@
<!-- <el-checkbox label="报警" disabled :checked="scope.row.subscribeCycleForAlarm > 0"></el-checkbox>-->
</template>
</el-table-column>
<el-table-column prop="keepaliveTime" label="最近心跳" min-width="140" />
<el-table-column prop="registerTime" label="最近注册" min-width="140" />
<el-table-column label="操作" min-width="300" fixed="right">
<template v-slot:default="scope">
<el-button

View File

@ -12,8 +12,6 @@ create table IF NOT EXISTS wvp_device
transport character varying(50) COMMENT '信令传输协议TCP/UDP',
stream_mode character varying(50) COMMENT '拉流方式(主动/被动)',
on_line bool default false COMMENT '在线状态',
register_time character varying(50) COMMENT '注册时间',
keepalive_time character varying(50) COMMENT '最近心跳时间',
ip character varying(50) COMMENT '设备IP地址',
create_time character varying(50) COMMENT '创建时间',
update_time character varying(50) COMMENT '更新时间',

View File

@ -11,8 +11,6 @@ create table IF NOT EXISTS wvp_device
transport character varying(50),
stream_mode character varying(50),
on_line bool default false,
register_time character varying(50),
keepalive_time character varying(50),
ip character varying(50),
create_time character varying(50),
update_time character varying(50),
@ -49,8 +47,6 @@ COMMENT ON COLUMN wvp_device.firmware IS '固件版本号';
COMMENT ON COLUMN wvp_device.transport IS '信令传输协议TCP/UDP';
COMMENT ON COLUMN wvp_device.stream_mode IS '拉流方式(主动/被动)';
COMMENT ON COLUMN wvp_device.on_line IS '在线状态';
COMMENT ON COLUMN wvp_device.register_time IS '注册时间';
COMMENT ON COLUMN wvp_device.keepalive_time IS '最近心跳时间';
COMMENT ON COLUMN wvp_device.ip IS '设备IP地址';
COMMENT ON COLUMN wvp_device.create_time IS '创建时间';
COMMENT ON COLUMN wvp_device.update_time IS '更新时间';

View File

@ -120,6 +120,27 @@ DELIMITER ;
drop index uk_media_server_unique_ip_http_port on wvp_media_server;
/*
* 202601025
*/
DELIMITER // -- 重定义分隔符避免分号冲突
CREATE PROCEDURE `wvp_202601025`()
BEGIN
IF EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device' and column_name = 'register_time')
THEN
ALTER TABLE wvp_device DROP register_time;
END IF;
IF EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device' and column_name = 'keepalive_time')
THEN
ALTER TABLE wvp_device DROP keepalive_time;
END IF;
END; //
call wvp_202601025();
DROP PROCEDURE wvp_202601025;
DELIMITER ;

View File

@ -44,3 +44,6 @@ ALTER table wvp_common_group ADD COLUMN IF NOT EXISTS alias varchar(255) default
ALTER table wvp_stream_proxy DROP COLUMN IF EXISTS enable_remove_none_reader;
drop index uk_media_server_unique_ip_http_port on wvp_media_server;
ALTER table wvp_device DROP COLUMN IF EXISTS register_time;
ALTER table wvp_device DROP COLUMN IF EXISTS keepalive_time;