增加设备写锁机制,优化通道数据的并发处理

This commit is contained in:
lin 2026-05-21 16:43:59 +08:00
parent 920ad76eb6
commit c6b8197533
17 changed files with 396 additions and 98 deletions

View File

@ -3,6 +3,9 @@ package com.genersoft.iot.vmp.gb28181.dao;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.controller.bean.Extent;
import com.genersoft.iot.vmp.gb28181.dao.provider.ChannelProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.MysqlCommonChannelUpsertProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.PostgresStyleCommonChannelUpsertProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.H2CommonChannelUpsertProvider;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.web.custom.bean.CameraChannel;
@ -173,6 +176,12 @@ public interface CommonGBChannelMapper {
List<CommonGBChannel> queryInListByStatus(List<CommonGBChannel> commonGBChannelList, @Param("status") String status);
@InsertProvider(type = MysqlCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "mysql")
@InsertProvider(type = PostgresStyleCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "postgresql")
@InsertProvider(type = PostgresStyleCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "kingbase")
@InsertProvider(type = H2CommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "h2")
int batchUpsert(@Param("channels") List<CommonGBChannel> channels);
@Insert(" <script>" +
"INSERT INTO wvp_device_channel (" +
"gb_device_id," +

View File

@ -5,6 +5,9 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import com.genersoft.iot.vmp.gb28181.dao.provider.DeviceChannelProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.MysqlChannelUpsertProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.PostgresStyleUpsertProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.H2ChannelUpsertProvider;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
@ -196,6 +199,12 @@ public interface DeviceChannelMapper {
@Update(value = {"UPDATE wvp_device_channel SET status='OFF' WHERE id=#{id}"})
void offline(@Param("id") int id);
@InsertProvider(type = MysqlChannelUpsertProvider.class, method = "batchUpsert", databaseId = "mysql")
@InsertProvider(type = PostgresStyleUpsertProvider.class, method = "batchUpsert", databaseId = "postgresql")
@InsertProvider(type = PostgresStyleUpsertProvider.class, method = "batchUpsert", databaseId = "kingbase")
@InsertProvider(type = H2ChannelUpsertProvider.class, method = "batchUpsert", databaseId = "h2")
int batchUpsert(@Param("channels") List<DeviceChannel> channels);
@Insert("<script> " +
"insert into wvp_device_channel " +
"(device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +

View File

@ -0,0 +1,25 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class H2ChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"MERGE INTO wvp_device_channel (" +
"data_device_id, device_id, data_type, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type" +
") KEY (data_device_id, device_id) VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.dataDeviceId}, #{item.deviceId}, #{item.dataType}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " +
"#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," +
" #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType})" +
"</foreach>" +
"</script>";
}
}

View File

@ -0,0 +1,29 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class H2CommonChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"MERGE INTO wvp_device_channel (" +
"gb_device_id, data_type, data_device_id, create_time, update_time, " +
"gb_name, gb_manufacturer, gb_model, gb_owner, gb_civil_code, gb_block, gb_address, " +
"gb_parental, gb_parent_id, gb_safety_way, gb_register_way, gb_cert_num, gb_certifiable, " +
"gb_err_code, gb_end_time, gb_secrecy, gb_ip_address, gb_port, gb_password, gb_status, " +
"gb_longitude, gb_latitude, gb_ptz_type, gb_position_type, gb_room_type, gb_use_type, " +
"gb_supply_light_type, gb_direction_type, gb_resolution, gb_business_group_id, " +
"gb_download_speed, gb_svc_space_support_mod, gb_svc_time_support_mode, enable_broadcast" +
") KEY (gb_device_id) VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.gbDeviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.createTime}, #{item.updateTime}," +
"#{item.gbName}, #{item.gbManufacturer}, #{item.gbModel}, #{item.gbOwner}, #{item.gbCivilCode}, #{item.gbBlock}, #{item.gbAddress}," +
"#{item.gbParental}, #{item.gbParentId}, #{item.gbSafetyWay}, #{item.gbRegisterWay}, #{item.gbCertNum}, #{item.gbCertifiable}," +
"#{item.gbErrCode}, #{item.gbEndTime}, #{item.gbSecrecy}, #{item.gbIpAddress}, #{item.gbPort}, #{item.gbPassword}, #{item.gbStatus}," +
"#{item.gbLongitude}, #{item.gbLatitude}, #{item.gbPtzType}, #{item.gbPositionType}, #{item.gbRoomType}, #{item.gbUseType}," +
"#{item.gbSupplyLightType}, #{item.gbDirectionType}, #{item.gbResolution}, #{item.gbBusinessGroupId}," +
"#{item.gbDownloadSpeed}, #{item.gbSvcSpaceSupportMod}, #{item.gbSvcTimeSupportMode}, #{item.enableBroadcast})" +
"</foreach>" +
"</script>";
}
}

View File

@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class MysqlChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"INSERT INTO wvp_device_channel (" +
"device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type" +
") VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.deviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " +
"#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," +
" #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType})" +
"</foreach>" +
"ON DUPLICATE KEY UPDATE " +
"name=VALUES(name), manufacturer=VALUES(manufacturer), model=VALUES(model), owner=VALUES(owner), " +
"civil_code=VALUES(civil_code), block=VALUES(block), address=VALUES(address), " +
"parental=VALUES(parental), parent_id=VALUES(parent_id), safety_way=VALUES(safety_way), " +
"register_way=VALUES(register_way), cert_num=VALUES(cert_num), certifiable=VALUES(certifiable), " +
"err_code=VALUES(err_code), end_time=VALUES(end_time), secrecy=VALUES(secrecy), " +
"ip_address=VALUES(ip_address), port=VALUES(port), password=VALUES(password), " +
"status=VALUES(status), longitude=VALUES(longitude), latitude=VALUES(latitude), " +
"ptz_type=VALUES(ptz_type), position_type=VALUES(position_type), room_type=VALUES(room_type), " +
"use_type=VALUES(use_type), supply_light_type=VALUES(supply_light_type), " +
"direction_type=VALUES(direction_type), resolution=VALUES(resolution), " +
"business_group_id=VALUES(business_group_id), download_speed=VALUES(download_speed), " +
"svc_space_support_mod=VALUES(svc_space_support_mod), " +
"svc_time_support_mode=VALUES(svc_time_support_mode), " +
"update_time=VALUES(update_time), sub_count=VALUES(sub_count), " +
"stream_id=VALUES(stream_id), has_audio=VALUES(has_audio), " +
"gps_time=VALUES(gps_time), stream_identification=VALUES(stream_identification), " +
"channel_type=VALUES(channel_type)" +
"</script>";
}
}

View File

@ -0,0 +1,46 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class MysqlCommonChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"INSERT INTO wvp_device_channel (" +
"gb_device_id, data_type, data_device_id, create_time, update_time, " +
"gb_name, gb_manufacturer, gb_model, gb_owner, gb_civil_code, gb_block, gb_address, " +
"gb_parental, gb_parent_id, gb_safety_way, gb_register_way, gb_cert_num, gb_certifiable, " +
"gb_err_code, gb_end_time, gb_secrecy, gb_ip_address, gb_port, gb_password, gb_status, " +
"gb_longitude, gb_latitude, gb_ptz_type, gb_position_type, gb_room_type, gb_use_type, " +
"gb_supply_light_type, gb_direction_type, gb_resolution, gb_business_group_id, " +
"gb_download_speed, gb_svc_space_support_mod, gb_svc_time_support_mode, enable_broadcast" +
") VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.gbDeviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.createTime}, #{item.updateTime}," +
"#{item.gbName}, #{item.gbManufacturer}, #{item.gbModel}, #{item.gbOwner}, #{item.gbCivilCode}, #{item.gbBlock}, #{item.gbAddress}," +
"#{item.gbParental}, #{item.gbParentId}, #{item.gbSafetyWay}, #{item.gbRegisterWay}, #{item.gbCertNum}, #{item.gbCertifiable}," +
"#{item.gbErrCode}, #{item.gbEndTime}, #{item.gbSecrecy}, #{item.gbIpAddress}, #{item.gbPort}, #{item.gbPassword}, #{item.gbStatus}," +
"#{item.gbLongitude}, #{item.gbLatitude}, #{item.gbPtzType}, #{item.gbPositionType}, #{item.gbRoomType}, #{item.gbUseType}," +
"#{item.gbSupplyLightType}, #{item.gbDirectionType}, #{item.gbResolution}, #{item.gbBusinessGroupId}," +
"#{item.gbDownloadSpeed}, #{item.gbSvcSpaceSupportMod}, #{item.gbSvcTimeSupportMode}, #{item.enableBroadcast})" +
"</foreach>" +
"ON DUPLICATE KEY UPDATE " +
"gb_name=VALUES(gb_name), gb_manufacturer=VALUES(gb_manufacturer), gb_model=VALUES(gb_model), " +
"gb_owner=VALUES(gb_owner), gb_civil_code=VALUES(gb_civil_code), gb_block=VALUES(gb_block), " +
"gb_address=VALUES(gb_address), gb_parental=VALUES(gb_parental), gb_parent_id=VALUES(gb_parent_id), " +
"gb_safety_way=VALUES(gb_safety_way), gb_register_way=VALUES(gb_register_way), " +
"gb_cert_num=VALUES(gb_cert_num), gb_certifiable=VALUES(gb_certifiable), " +
"gb_err_code=VALUES(gb_err_code), gb_end_time=VALUES(gb_end_time), gb_secrecy=VALUES(gb_secrecy), " +
"gb_ip_address=VALUES(gb_ip_address), gb_port=VALUES(gb_port), gb_password=VALUES(gb_password), " +
"gb_status=VALUES(gb_status), gb_longitude=VALUES(gb_longitude), gb_latitude=VALUES(gb_latitude), " +
"gb_ptz_type=VALUES(gb_ptz_type), gb_position_type=VALUES(gb_position_type), " +
"gb_room_type=VALUES(gb_room_type), gb_use_type=VALUES(gb_use_type), " +
"gb_supply_light_type=VALUES(gb_supply_light_type), gb_direction_type=VALUES(gb_direction_type), " +
"gb_resolution=VALUES(gb_resolution), gb_business_group_id=VALUES(gb_business_group_id), " +
"gb_download_speed=VALUES(gb_download_speed), " +
"gb_svc_space_support_mod=VALUES(gb_svc_space_support_mod), " +
"gb_svc_time_support_mode=VALUES(gb_svc_time_support_mode), " +
"enable_broadcast=VALUES(enable_broadcast), update_time=VALUES(update_time)" +
"</script>";
}
}

View File

@ -0,0 +1,46 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class PostgresStyleCommonChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"INSERT INTO wvp_device_channel (" +
"gb_device_id, data_type, data_device_id, create_time, update_time, " +
"gb_name, gb_manufacturer, gb_model, gb_owner, gb_civil_code, gb_block, gb_address, " +
"gb_parental, gb_parent_id, gb_safety_way, gb_register_way, gb_cert_num, gb_certifiable, " +
"gb_err_code, gb_end_time, gb_secrecy, gb_ip_address, gb_port, gb_password, gb_status, " +
"gb_longitude, gb_latitude, gb_ptz_type, gb_position_type, gb_room_type, gb_use_type, " +
"gb_supply_light_type, gb_direction_type, gb_resolution, gb_business_group_id, " +
"gb_download_speed, gb_svc_space_support_mod, gb_svc_time_support_mode, enable_broadcast" +
") VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.gbDeviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.createTime}, #{item.updateTime}," +
"#{item.gbName}, #{item.gbManufacturer}, #{item.gbModel}, #{item.gbOwner}, #{item.gbCivilCode}, #{item.gbBlock}, #{item.gbAddress}," +
"#{item.gbParental}, #{item.gbParentId}, #{item.gbSafetyWay}, #{item.gbRegisterWay}, #{item.gbCertNum}, #{item.gbCertifiable}," +
"#{item.gbErrCode}, #{item.gbEndTime}, #{item.gbSecrecy}, #{item.gbIpAddress}, #{item.gbPort}, #{item.gbPassword}, #{item.gbStatus}," +
"#{item.gbLongitude}, #{item.gbLatitude}, #{item.gbPtzType}, #{item.gbPositionType}, #{item.gbRoomType}, #{item.gbUseType}," +
"#{item.gbSupplyLightType}, #{item.gbDirectionType}, #{item.gbResolution}, #{item.gbBusinessGroupId}," +
"#{item.gbDownloadSpeed}, #{item.gbSvcSpaceSupportMod}, #{item.gbSvcTimeSupportMode}, #{item.enableBroadcast})" +
"</foreach>" +
"ON CONFLICT (gb_device_id) DO UPDATE SET " +
"gb_name=EXCLUDED.gb_name, gb_manufacturer=EXCLUDED.gb_manufacturer, gb_model=EXCLUDED.gb_model, " +
"gb_owner=EXCLUDED.gb_owner, gb_civil_code=EXCLUDED.gb_civil_code, gb_block=EXCLUDED.gb_block, " +
"gb_address=EXCLUDED.gb_address, gb_parental=EXCLUDED.gb_parental, gb_parent_id=EXCLUDED.gb_parent_id, " +
"gb_safety_way=EXCLUDED.gb_safety_way, gb_register_way=EXCLUDED.gb_register_way, " +
"gb_cert_num=EXCLUDED.gb_cert_num, gb_certifiable=EXCLUDED.gb_certifiable, " +
"gb_err_code=EXCLUDED.gb_err_code, gb_end_time=EXCLUDED.gb_end_time, gb_secrecy=EXCLUDED.gb_secrecy, " +
"gb_ip_address=EXCLUDED.gb_ip_address, gb_port=EXCLUDED.gb_port, gb_password=EXCLUDED.gb_password, " +
"gb_status=EXCLUDED.gb_status, gb_longitude=EXCLUDED.gb_longitude, gb_latitude=EXCLUDED.gb_latitude, " +
"gb_ptz_type=EXCLUDED.gb_ptz_type, gb_position_type=EXCLUDED.gb_position_type, " +
"gb_room_type=EXCLUDED.gb_room_type, gb_use_type=EXCLUDED.gb_use_type, " +
"gb_supply_light_type=EXCLUDED.gb_supply_light_type, gb_direction_type=EXCLUDED.gb_direction_type, " +
"gb_resolution=EXCLUDED.gb_resolution, gb_business_group_id=EXCLUDED.gb_business_group_id, " +
"gb_download_speed=EXCLUDED.gb_download_speed, " +
"gb_svc_space_support_mod=EXCLUDED.gb_svc_space_support_mod, " +
"gb_svc_time_support_mode=EXCLUDED.gb_svc_time_support_mode, " +
"enable_broadcast=EXCLUDED.enable_broadcast, update_time=EXCLUDED.update_time" +
"</script>";
}
}

View File

@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class PostgresStyleUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"INSERT INTO wvp_device_channel (" +
"device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type" +
") VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.deviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " +
"#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," +
" #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType})" +
"</foreach>" +
"ON CONFLICT (data_device_id, device_id) DO UPDATE SET " +
"name=EXCLUDED.name, manufacturer=EXCLUDED.manufacturer, model=EXCLUDED.model, owner=EXCLUDED.owner, " +
"civil_code=EXCLUDED.civil_code, block=EXCLUDED.block, address=EXCLUDED.address, " +
"parental=EXCLUDED.parental, parent_id=EXCLUDED.parent_id, safety_way=EXCLUDED.safety_way, " +
"register_way=EXCLUDED.register_way, cert_num=EXCLUDED.cert_num, certifiable=EXCLUDED.certifiable, " +
"err_code=EXCLUDED.err_code, end_time=EXCLUDED.end_time, secrecy=EXCLUDED.secrecy, " +
"ip_address=EXCLUDED.ip_address, port=EXCLUDED.port, password=EXCLUDED.password, " +
"status=EXCLUDED.status, longitude=EXCLUDED.longitude, latitude=EXCLUDED.latitude, " +
"ptz_type=EXCLUDED.ptz_type, position_type=EXCLUDED.position_type, room_type=EXCLUDED.room_type, " +
"use_type=EXCLUDED.use_type, supply_light_type=EXCLUDED.supply_light_type, " +
"direction_type=EXCLUDED.direction_type, resolution=EXCLUDED.resolution, " +
"business_group_id=EXCLUDED.business_group_id, download_speed=EXCLUDED.download_speed, " +
"svc_space_support_mod=EXCLUDED.svc_space_support_mod, " +
"svc_time_support_mode=EXCLUDED.svc_time_support_mode, " +
"update_time=EXCLUDED.update_time, sub_count=EXCLUDED.sub_count, " +
"stream_id=EXCLUDED.stream_id, has_audio=EXCLUDED.has_audio, " +
"gps_time=EXCLUDED.gps_time, stream_identification=EXCLUDED.stream_identification, " +
"channel_type=EXCLUDED.channel_type" +
"</script>";
}
}

View File

@ -114,75 +114,58 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
if (CollectionUtils.isEmpty(channels)) {
return 0;
}
List<DeviceChannel> addChannels = new ArrayList<>();
List<DeviceChannel> updateChannels = new ArrayList<>();
HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
// 入参去重
Set<String> dedupSet = new HashSet<>();
List<DeviceChannel> uniqueChannels = new ArrayList<>();
for (DeviceChannel ch : channels) {
if (dedupSet.add(ch.getDeviceId())) {
uniqueChannels.add(ch);
}
}
List<DeviceChannel> upsertChannels = new ArrayList<>();
int result = 0;
List<DeviceChannel> channelList = channelMapper.queryChannelsByDeviceDbId(device.getId());
if (channelList.isEmpty()) {
for (DeviceChannel channel : channels) {
String now = DateUtil.getNow();
for (DeviceChannel channel : uniqueChannels) {
channel.setDataDeviceId(device.getId());
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
channel.setStreamId(inviteInfo.getStreamInfo().getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
channel.setCreateTime(now);
addChannels.add(channel);
upsertChannels.add(channel);
}
}else {
} else {
HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
for (DeviceChannel deviceChannel : channelList) {
channelsInStore.put(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId(), deviceChannel);
}
for (DeviceChannel channel : channels) {
String now = DateUtil.getNow();
for (DeviceChannel channel : uniqueChannels) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
channel.setStreamId(inviteInfo.getStreamInfo().getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
DeviceChannel deviceChannelInDb = channelsInStore.get(channel.getDataDeviceId() + channel.getDeviceId());
if ( deviceChannelInDb != null) {
if (deviceChannelInDb != null) {
channel.setId(deviceChannelInDb.getId());
channel.setUpdateTime(now);
updateChannels.add(channel);
}else {
channel.setCreateTime(deviceChannelInDb.getCreateTime());
} else {
channel.setCreateTime(now);
channel.setUpdateTime(now);
addChannels.add(channel);
}
upsertChannels.add(channel);
}
}
Set<String> channelSet = new HashSet<>();
// 滤重
List<DeviceChannel> addChannelList = new ArrayList<>();
List<DeviceChannel> updateChannelList = new ArrayList<>();
addChannels.forEach(channel -> {
if (channelSet.add(channel.getDeviceId())) {
addChannelList.add(channel);
}
});
channelSet.clear();
updateChannels.forEach(channel -> {
if (channelSet.add(channel.getDeviceId())) {
updateChannelList.add(channel);
}
});
int limitCount = 500;
if (!addChannelList.isEmpty()) {
for (int i = 0; i < addChannelList.size(); i += limitCount) {
int end = Math.min(i + limitCount, addChannelList.size());
List<DeviceChannel> batchList = addChannelList.subList(i, end);
result += channelMapper.batchAdd(batchList);
}
}
if (!updateChannelList.isEmpty()) {
for (int i = 0; i < updateChannelList.size(); i += limitCount) {
int end = Math.min(i + limitCount, updateChannelList.size());
List<DeviceChannel> batchList = updateChannelList.subList(i, end);
result += channelMapper.batchUpdate(batchList);
if (!upsertChannels.isEmpty()) {
for (int i = 0; i < upsertChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, upsertChannels.size());
List<DeviceChannel> batchList = upsertChannels.subList(i, end);
result += channelMapper.batchUpsert(batchList);
}
}
return result;
@ -385,21 +368,26 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
allChannelMap.put(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId(), deviceChannel);
}
}
// 数据去重
List<DeviceChannel> channels = new ArrayList<>();
// 入参去重
Set<String> dedupSet = new HashSet<>();
List<DeviceChannel> uniqueChannels = new ArrayList<>();
for (DeviceChannel ch : deviceChannelList) {
if (dedupSet.add(ch.getDeviceId())) {
uniqueChannels.add(ch);
}
}
List<DeviceChannel> updateChannels = new ArrayList<>();
List<DeviceChannel> addChannels = new ArrayList<>();
List<DeviceChannel> upsertChannels = new ArrayList<>();
List<DeviceChannel> deleteChannels = new ArrayList<>();
StringBuilder stringBuilder = new StringBuilder();
Map<String, Integer> subContMap = new HashMap<>();
for (DeviceChannel deviceChannel : deviceChannelList) {
for (DeviceChannel deviceChannel : uniqueChannels) {
DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId());
if (channelInDb != null) {
deviceChannel.setStreamId(channelInDb.getStreamId());
deviceChannel.setHasAudio(channelInDb.isHasAudio());
deviceChannel.setId(channelInDb.getId());
deviceChannel.setCreateTime(channelInDb.getCreateTime());
if (channelInDb.getStatus() != null && !channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){
List<Platform> platformList = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getDeviceId());
if (!CollectionUtils.isEmpty(platformList)){
@ -409,60 +397,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
}
deviceChannel.setUpdateTime(DateUtil.getNow());
updateChannels.add(deviceChannel);
}else {
} else {
deviceChannel.setCreateTime(DateUtil.getNow());
deviceChannel.setUpdateTime(DateUtil.getNow());
addChannels.add(deviceChannel);
}
allChannelMap.remove(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId());
channels.add(deviceChannel);
upsertChannels.add(deviceChannel);
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
if (subContMap.get(deviceChannel.getParentId()) == null) {
subContMap.put(deviceChannel.getParentId(), 1);
}else {
} else {
Integer count = subContMap.get(deviceChannel.getParentId());
subContMap.put(deviceChannel.getParentId(), count++);
}
}
}
deleteChannels.addAll(allChannelMap.values());
if (!channels.isEmpty()) {
for (DeviceChannel channel : channels) {
if (subContMap.get(channel.getDeviceId()) != null){
Integer count = subContMap.get(channel.getDeviceId());
if (count > 0) {
channel.setSubCount(count);
channel.setParental(1);
}
for (DeviceChannel channel : upsertChannels) {
if (subContMap.get(channel.getDeviceId()) != null){
Integer count = subContMap.get(channel.getDeviceId());
if (count > 0) {
channel.setSubCount(count);
channel.setParental(1);
}
}
}
if (stringBuilder.length() > 0) {
log.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
}
if(CollectionUtils.isEmpty(channels)){
if(CollectionUtils.isEmpty(upsertChannels)){
log.info("通道重设,数据为空={}" , deviceChannelList);
return false;
}
int limitCount = 500;
if (!addChannels.isEmpty()) {
for (int i = 0; i < addChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, addChannels.size());
List<DeviceChannel> batchList = addChannels.subList(i, end);
channelMapper.batchAdd(batchList);
if (!upsertChannels.isEmpty()) {
for (int i = 0; i < upsertChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, upsertChannels.size());
List<DeviceChannel> batchList = upsertChannels.subList(i, end);
channelMapper.batchUpsert(batchList);
}
}
if (!updateChannels.isEmpty()) {
for (int i = 0; i < updateChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, updateChannels.size());
List<DeviceChannel> batchList = updateChannels.subList(i, end);
channelMapper.batchUpdate(batchList);
}
// 不对收到的通道做比较已确定是否真的发生变化所以不发送更新通知
}
if (!deleteChannels.isEmpty()) {
try {
// 这些通道可能关联了上级平台需要删除同时发送消息

View File

@ -325,13 +325,13 @@ public class GbChannelServiceImpl implements IGbChannelService {
log.warn("[新增多个通道] 通道数量为0更新失败");
return;
}
// 批量保存
// 批量保存使用UPSERT防重复
int limitCount = 1000;
int result = 0;
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, commonGBChannels.size());
List<CommonGBChannel> batchList = commonGBChannels.subList(i, end);
result += commonGBChannelMapper.batchAdd(batchList);
result += commonGBChannelMapper.batchUpsert(batchList);
}
try {
// 发送catalog

View File

@ -19,6 +19,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@Component
@ -38,6 +39,16 @@ public class CatalogDataManager{
private final Map<String, CatalogData> dataMap = new ConcurrentHashMap<>();
private final Map<String, ReentrantLock> deviceWriteLocks = new ConcurrentHashMap<>();
public ReentrantLock getDeviceWriteLock(String deviceId) {
return deviceWriteLocks.computeIfAbsent(deviceId, k -> new ReentrantLock());
}
public void removeDeviceWriteLock(String deviceId) {
deviceWriteLocks.remove(deviceId);
}
private final String key = "VMP_CATALOG_DATA";
public String buildMapKey(String deviceId, int sn ) {
@ -240,12 +251,17 @@ public class CatalogDataManager{
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
if ( catalogData.getTime().isBefore(instantBefore5S)) {
String deviceId = catalogData.getDevice().getDeviceId();
int sn = catalogData.getSn();
List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId, sn);
ReentrantLock lock = getDeviceWriteLock(deviceId);
if (!lock.tryLock()) {
// saveData() 正在执行跳过本次等下一个5s周期
continue;
}
try {
int sn = catalogData.getSn();
List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId, sn);
if (catalogData.getTotal() == deviceChannelList.size()) {
deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList);
}else {
} else {
deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList);
}
List<Region> regionList = getRegionList(deviceId, sn);
@ -256,16 +272,23 @@ public class CatalogDataManager{
if (groupList != null && !groupList.isEmpty()) {
groupService.batchAdd(groupList);
}
}catch (Exception e) {
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "";
catalogData.setErrorMsg(errorMsg);
} catch (Exception e) {
log.error("[国标通道同步] 入库失败: ", e);
} finally {
lock.unlock();
}
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "";
catalogData.setErrorMsg(errorMsg);
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
}
}else {
if (catalogData.getTime().isBefore(instantBefore30S)) {
String deviceId = catalogData.getDevice().getDeviceId();
dataMap.remove(dataKey);
// 清理可能残留的设备锁
if (deviceWriteLocks.containsKey(deviceId)) {
deviceWriteLocks.remove(deviceId);
}
Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
for (String deleteKey : redisKeysForChannel) {

View File

@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.Thread;
/**
@ -163,16 +164,22 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
String deviceId = device.getDeviceId();
if (catalogDataCatch.size(deviceId, sn) > 0
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
// 数据已经完整接收 此时可能存在某个设备离线变上线的情况但是考虑到性能此处不做处理
// 数据已经完整接收 此时可能存在某个设备离线变上线的情况但是考虑到性能此处不做处理
// 目前支持设备通道上线通知时和设备上线时向上级通知
int finalSn = sn;
Thread.startVirtualThread(() -> {
boolean resetChannelsResult = saveData(device, finalSn);
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "";
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg);
} else {
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null);
ReentrantLock lock = catalogDataCatch.getDeviceWriteLock(device.getDeviceId());
lock.lock();
try {
boolean resetChannelsResult = saveData(device, finalSn);
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "";
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg);
} else {
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null);
}
} finally {
lock.unlock();
}
});
}

View File

@ -155,7 +155,8 @@ create table IF NOT EXISTS wvp_device_channel
gps_speed double precision,
gps_altitude double precision,
gps_direction double precision,
constraint uk_wvp_unique_channel unique (gb_device_id)
constraint uk_wvp_unique_channel unique (gb_device_id),
constraint uk_device_channel_source unique (data_device_id, device_id)
);
create table IF NOT EXISTS wvp_media_server

View File

@ -162,7 +162,8 @@ create table IF NOT EXISTS wvp_device_channel
enable_broadcast integer default 0 COMMENT '是否支持广播',
index (data_type),
index (data_device_id),
constraint uk_wvp_unique_channel unique (gb_device_id)
constraint uk_wvp_unique_channel unique (gb_device_id),
constraint uk_device_channel_source unique (data_device_id, device_id)
);
-- 媒体服务器(如 ZLM节点信息

View File

@ -215,7 +215,8 @@ create table IF NOT EXISTS wvp_device_channel
gps_altitude double precision,
gps_direction double precision,
enable_broadcast integer default 0,
constraint uk_wvp_unique_channel unique (gb_device_id)
constraint uk_wvp_unique_channel unique (gb_device_id),
constraint uk_device_channel_source unique (data_device_id, device_id)
);
COMMENT ON TABLE wvp_device_channel IS '保存设备下的通道信息以及扩展属性';
COMMENT ON COLUMN wvp_device_channel.id IS '主键ID';

View File

@ -196,7 +196,29 @@ call wvp_20260417();
DROP PROCEDURE wvp_20260417;
DELIMITER ;
/*
* 20260521 wvp_device_channel唯一约束
*/
DELIMITER //
CREATE PROCEDURE `wvp_20260521`()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = (SELECT DATABASE())
AND TABLE_NAME = 'wvp_device_channel'
AND INDEX_NAME = 'uk_device_channel_source')
THEN
-- 先清理可能的重复数据
DELETE t1 FROM wvp_device_channel t1
INNER JOIN wvp_device_channel t2
WHERE t1.id < t2.id
AND t1.data_device_id = t2.data_device_id
AND t1.device_id = t2.device_id;
ALTER TABLE wvp_device_channel ADD UNIQUE INDEX uk_device_channel_source (data_device_id, device_id);
END IF;
END; //
call wvp_20260521();
DROP PROCEDURE wvp_20260521;
DELIMITER ;

View File

@ -68,6 +68,26 @@ COMMENT ON COLUMN wvp_alarm.alarm_type IS '报警类别';
COMMENT ON COLUMN wvp_alarm.alarm_time IS '报警时间';
/*
* 20260521 wvp_device_channel唯一约束
*/
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes
WHERE tablename = 'wvp_device_channel'
AND indexname = 'uk_device_channel_source') THEN
-- 先清理可能的重复数据
DELETE FROM wvp_device_channel t1
USING wvp_device_channel t2
WHERE t1.id < t2.id
AND t1.data_device_id = t2.data_device_id
AND t1.device_id = t2.device_id;
ALTER TABLE wvp_device_channel ADD CONSTRAINT uk_device_channel_source UNIQUE (data_device_id, device_id);
END IF;
END;
$$;
/*
* 20260417 wvp_device_mobile_position从专属国标的位置记录表
*/