优化报警信息构建和数据库批量操作逻辑

This commit is contained in:
lin 2026-04-04 00:12:52 +08:00
parent d307cf1073
commit 2d1608ca7a
8 changed files with 82 additions and 158 deletions

View File

@ -129,7 +129,7 @@ public class DeviceAlarmNotify {
return AlarmType.DeviceLowTemperature;
}
}
if (alarmMethod == DeviceAlarmMethod.GPS.getVal()) {
if (alarmMethod == DeviceAlarmMethod.Video.getVal()) {
// 5为视频报警
// 报警方式为5时,
// 取值如下:

View File

@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.bean.Alarm;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -173,29 +174,17 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
int limitCount = 500;
if (!addChannelList.isEmpty()) {
if (addChannelList.size() > limitCount) {
for (int i = 0; i < addChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > addChannelList.size()) {
toIndex = addChannelList.size();
}
result += channelMapper.batchAdd(addChannelList.subList(i, toIndex));
}
}else {
result += channelMapper.batchAdd(addChannelList);
int end = Math.min(i + limitCount, addChannelList.size());
List<DeviceChannel> batchList = addChannelList.subList(i, end);
result += channelMapper.batchAdd(batchList);
}
}
if (!updateChannelList.isEmpty()) {
if (updateChannelList.size() > limitCount) {
for (int i = 0; i < updateChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > updateChannelList.size()) {
toIndex = updateChannelList.size();
}
result += channelMapper.batchUpdate(updateChannelList.subList(i, toIndex));
}
}else {
result += channelMapper.batchUpdate(updateChannelList);
int end = Math.min(i + limitCount, updateChannelList.size());
List<DeviceChannel> batchList = updateChannelList.subList(i, end);
result += channelMapper.batchUpdate(batchList);
}
}
return result;
@ -461,29 +450,17 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
int limitCount = 500;
if (!addChannels.isEmpty()) {
if (addChannels.size() > limitCount) {
for (int i = 0; i < addChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > addChannels.size()) {
toIndex = addChannels.size();
}
channelMapper.batchAdd(addChannels.subList(i, toIndex));
}
}else {
channelMapper.batchAdd(addChannels);
int end = Math.min(i + limitCount, addChannels.size());
List<DeviceChannel> batchList = addChannels.subList(i, end);
channelMapper.batchAdd(batchList);
}
}
if (!updateChannels.isEmpty()) {
if (updateChannels.size() > limitCount) {
for (int i = 0; i < updateChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > updateChannels.size()) {
toIndex = updateChannels.size();
}
channelMapper.batchUpdate(updateChannels.subList(i, toIndex));
}
}else {
channelMapper.batchUpdate(updateChannels);
int end = Math.min(i + limitCount, updateChannels.size());
List<DeviceChannel> batchList = updateChannels.subList(i, end);
channelMapper.batchUpdate(batchList);
}
// 不对收到的通道做比较已确定是否真的发生变化所以不发送更新通知

View File

@ -239,17 +239,12 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
return;
}
int limitCount = 300;
if (offlineDevices.size() > limitCount) {
for (int i = 0; i < offlineDevices.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > offlineDevices.size()) {
toIndex = offlineDevices.size();
}
deviceMapper.offlineByList(offlineDevices.subList(i, toIndex));
}
}else {
deviceMapper.offlineByList(offlineDevices);
int endIndex = Math.min(i + limitCount, offlineDevices.size());
List<Device> subList = offlineDevices.subList(i, endIndex);
deviceMapper.offlineByList(subList);
}
for (Device device : offlineDevices) {
device.setOnLine(false);
redisCatchStorage.updateDevice(device);
@ -851,16 +846,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
int limitCount = 300;
if (!deviceList.isEmpty()) {
if (deviceList.size() > limitCount) {
for (int i = 0; i < deviceList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > deviceList.size()) {
toIndex = deviceList.size();
}
deviceMapper.batchUpdate(deviceList.subList(i, toIndex));
}
}else {
deviceMapper.batchUpdate(deviceList);
int endIndex = Math.min(i + limitCount, deviceList.size());
List<Device> subList = deviceList.subList(i, endIndex);
deviceMapper.batchUpdate(subList);
}
for (Device device : deviceList) {
redisCatchStorage.updateDevice(device);

View File

@ -258,16 +258,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
int result = 0;
if (permission) {
int limitCount = 1000;
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF");
}
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF");
int end = Math.min(i + limitCount, commonGBChannelList.size());
List<CommonGBChannel> batchList = commonGBChannelList.subList(i, end);
result += commonGBChannelMapper.updateStatusForListById(batchList, "OFF");
}
log.info("[通道离线] 保存入库 共 {} 个改变", result);
}
@ -309,16 +303,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
if (permission) {
// 批量更新
int limitCount = 1000;
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON");
}
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON");
int end = Math.min(i + limitCount, commonGBChannelList.size());
List<CommonGBChannel> batchList = commonGBChannelList.subList(i, end);
result += commonGBChannelMapper.updateStatusForListById(batchList, "ON");
}
}
try {
@ -341,16 +329,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
// 批量保存
int limitCount = 1000;
int result = 0;
if (commonGBChannels.size() > limitCount) {
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannels.size()) {
toIndex = commonGBChannels.size();
}
result += commonGBChannelMapper.batchAdd(commonGBChannels.subList(i, toIndex));
}
} else {
result += commonGBChannelMapper.batchAdd(commonGBChannels);
int end = Math.min(i + limitCount, commonGBChannels.size());
List<CommonGBChannel> batchList = commonGBChannels.subList(i, end);
result += commonGBChannelMapper.batchAdd(batchList);
}
try {
// 发送catalog
@ -372,16 +354,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
// 批量保存
int limitCount = 1000;
int result = 0;
if (commonGBChannels.size() > limitCount) {
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannels.size()) {
toIndex = commonGBChannels.size();
}
result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex));
}
} else {
result += commonGBChannelMapper.batchUpdate(commonGBChannels);
int end = Math.min(i + limitCount, commonGBChannels.size());
List<CommonGBChannel> batchList = commonGBChannels.subList(i, end);
result += commonGBChannelMapper.batchUpdate(batchList);
}
log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
}
@ -404,16 +380,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
List<CommonGBChannel> oldChanelListByChannels = commonGBChannelMapper.queryOldChanelListByChannels(commonGBChannels);
int limitCount = 1000;
int result = 0;
if (commonGBChannels.size() > limitCount) {
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannels.size()) {
toIndex = commonGBChannels.size();
}
result += commonGBChannelMapper.updateStatus(commonGBChannels.subList(i, toIndex));
}
} else {
result += commonGBChannelMapper.updateStatus(commonGBChannels);
int end = Math.min(i + limitCount, commonGBChannels.size());
List<CommonGBChannel> batchList = commonGBChannels.subList(i, end);
result += commonGBChannelMapper.updateStatus(batchList);
}
log.warn("[更新多个通道状态] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
// 发送通过更新通知
@ -925,16 +895,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
@Override
public void updateGPS(List<CommonGBChannel> commonGBChannels) {
int limitCount = 1000;
if (commonGBChannels.size() > limitCount) {
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannels.size()) {
toIndex = commonGBChannels.size();
}
commonGBChannelMapper.updateGps(commonGBChannels.subList(i, toIndex));
}
} else {
commonGBChannelMapper.updateGps(commonGBChannels);
int end = Math.min(i + limitCount, commonGBChannels.size());
List<CommonGBChannel> batchList = commonGBChannels.subList(i, end);
commonGBChannelMapper.updateGps(batchList);
}
}
@ -1196,16 +1160,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
List<CommonGBChannel> channelList = vectorTileCatch.getChannelList(id);
if (channelList != null && !channelList.isEmpty()) {
int limitCount = 1000;
if (channelList.size() > limitCount) {
for (int i = 0; i < channelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channelList.size()) {
toIndex = channelList.size();
}
commonGBChannelMapper.saveLevel(channelList.subList(i, toIndex));
}
} else {
commonGBChannelMapper.saveLevel(channelList);
int end = Math.min(i + limitCount, channelList.size());
List<CommonGBChannel> batchList = channelList.subList(i, end);
commonGBChannelMapper.saveLevel(batchList);
}
}
vectorTileCatch.save(id);

View File

@ -47,13 +47,13 @@ public class Alarm {
@Schema(description = "报警时间")
private Long alarmTime;
public static Alarm buildFromDeviceAlarmNotify(DeviceAlarmNotify deviceAlarmNotify) {
public static Alarm buildFromDeviceAlarmNotify(DeviceAlarmNotify notify) {
Alarm alarm = new Alarm();
alarm.setDescription(deviceAlarmNotify.getAlarmDescription());
alarm.setAlarmType(deviceAlarmNotify.getAlarmTypeEnum());
alarm.setAlarmTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime()));
alarm.setLongitude(deviceAlarmNotify.getLongitude());
alarm.setLatitude(deviceAlarmNotify.getLatitude());
alarm.setDescription(notify.getAlarmDescription());
alarm.setAlarmType(notify.getAlarmTypeEnum());
alarm.setAlarmTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(notify.getAlarmTime()));
alarm.setLongitude(notify.getLongitude());
alarm.setLatitude(notify.getLatitude());
return alarm;
}

View File

@ -72,13 +72,13 @@ public class AlarmServiceImpl implements IAlarmService {
}
alarm.setChannelId(deviceChannel.getId());
// 分配一个快照路径后续在去补充快照文件
alarm.setSnapPath("snap/alarm_" + UUID.randomUUID() + ".jpg");
alarm.setSnapPath("snap/alarm_" + notify.getChannelId() + "_" + System.currentTimeMillis() + ".jpg");
alarmQueue.offer(alarm);
}
}
@Scheduled(fixedDelay = 500)
public void executeTaskQueue() {
public void executeAlarmQueue() {
if (alarmQueue.isEmpty()) {
return;
}
@ -94,9 +94,12 @@ public class AlarmServiceImpl implements IAlarmService {
return;
}
// 批量保存到数据库
alarmMapper.insertAlarms(handlerCatchDataList);
// 异步处理快照的生成和保存避免影响报警信息的保存效率
int batchSize = 1000;
for (int i = 0; i < handlerCatchDataList.size(); i += batchSize) {
int end = Math.min(i + batchSize, handlerCatchDataList.size());
List<Alarm> batchList = handlerCatchDataList.subList(i, end);
alarmMapper.insertAlarms(batchList);
}
}

View File

@ -223,18 +223,12 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
}
int limitCount = 50;
int resultCount = 0;
if (all.size() > limitCount) {
for (int i = 0; i < all.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > all.size()) {
toIndex = all.size();
int end = Math.min(i + limitCount, all.size());
List<CloudRecordItem> batchList = all.subList(i, end);
resultCount += cloudRecordServiceMapper.updateCollectList(result, batchList);
}
resultCount += cloudRecordServiceMapper.updateCollectList(result, all.subList(i, toIndex));
}
}else {
resultCount = cloudRecordServiceMapper.updateCollectList(result, all);
}
return resultCount;
}

View File

@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.service.bean.Alarm;
import com.genersoft.iot.vmp.service.bean.AlarmType;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.*;
import java.util.List;
@ -43,6 +40,12 @@ public interface AlarmMapper {
"</script>")
void deleteAlarms(@Param("ids") List<Long> ids);
@Insert("<script>" +
"INSERT INTO wvp_alarm (channel_id, description, snap_path, record_path, longitude, latitude, alarm_type, alarm_time)" +
" VALUES " +
"<foreach collection='handlerCatchDataList' item='item' open='(' separator=',' close=')'>" +
"#{item.channelId}, #{item.description}, #{item.snapPath}, #{item.recordPath}, #{item.longitude}, #{item.latitude}, #{item.alarmType}, #{item.alarmTime}" +
"</foreach>" +
"</script>")
void insertAlarms(List<Alarm> handlerCatchDataList);
}