diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java index a264cf747..fabc2e9e5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java @@ -3,6 +3,9 @@ package com.genersoft.iot.vmp.gb28181.dao; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.controller.bean.Extent; import com.genersoft.iot.vmp.gb28181.dao.provider.ChannelProvider; +import com.genersoft.iot.vmp.gb28181.dao.provider.MysqlCommonChannelUpsertProvider; +import com.genersoft.iot.vmp.gb28181.dao.provider.PostgresStyleCommonChannelUpsertProvider; +import com.genersoft.iot.vmp.gb28181.dao.provider.H2CommonChannelUpsertProvider; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.web.custom.bean.CameraChannel; @@ -173,6 +176,12 @@ public interface CommonGBChannelMapper { List queryInListByStatus(List commonGBChannelList, @Param("status") String status); + @InsertProvider(type = MysqlCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "mysql") + @InsertProvider(type = PostgresStyleCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "postgresql") + @InsertProvider(type = PostgresStyleCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "kingbase") + @InsertProvider(type = H2CommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "h2") + int batchUpsert(@Param("channels") List channels); + @Insert(" "; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/H2CommonChannelUpsertProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/H2CommonChannelUpsertProvider.java new file mode 100644 index 000000000..58b786f83 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/H2CommonChannelUpsertProvider.java @@ -0,0 +1,29 @@ +package com.genersoft.iot.vmp.gb28181.dao.provider; + +import java.util.Map; + +public class H2CommonChannelUpsertProvider { + + public String batchUpsert(Map params) { + return ""; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/MysqlChannelUpsertProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/MysqlChannelUpsertProvider.java new file mode 100644 index 000000000..238d3288c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/MysqlChannelUpsertProvider.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.gb28181.dao.provider; + +import java.util.Map; + +public class MysqlChannelUpsertProvider { + + public String batchUpsert(Map params) { + return ""; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/MysqlCommonChannelUpsertProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/MysqlCommonChannelUpsertProvider.java new file mode 100644 index 000000000..a9275d6a1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/MysqlCommonChannelUpsertProvider.java @@ -0,0 +1,46 @@ +package com.genersoft.iot.vmp.gb28181.dao.provider; + +import java.util.Map; + +public class MysqlCommonChannelUpsertProvider { + + public String batchUpsert(Map params) { + return ""; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/PostgresStyleCommonChannelUpsertProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/PostgresStyleCommonChannelUpsertProvider.java new file mode 100644 index 000000000..dc5132bf1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/PostgresStyleCommonChannelUpsertProvider.java @@ -0,0 +1,46 @@ +package com.genersoft.iot.vmp.gb28181.dao.provider; + +import java.util.Map; + +public class PostgresStyleCommonChannelUpsertProvider { + + public String batchUpsert(Map params) { + return ""; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/PostgresStyleUpsertProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/PostgresStyleUpsertProvider.java new file mode 100644 index 000000000..4f2cbfcaa --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/PostgresStyleUpsertProvider.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.gb28181.dao.provider; + +import java.util.Map; + +public class PostgresStyleUpsertProvider { + + public String batchUpsert(Map params) { + return ""; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index 2a1ecbacc..cc5c46166 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -114,75 +114,58 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { if (CollectionUtils.isEmpty(channels)) { return 0; } - List addChannels = new ArrayList<>(); - List updateChannels = new ArrayList<>(); - HashMap channelsInStore = new HashMap<>(); + // 入参去重 + Set dedupSet = new HashSet<>(); + List uniqueChannels = new ArrayList<>(); + for (DeviceChannel ch : channels) { + if (dedupSet.add(ch.getDeviceId())) { + uniqueChannels.add(ch); + } + } + List upsertChannels = new ArrayList<>(); int result = 0; List channelList = channelMapper.queryChannelsByDeviceDbId(device.getId()); if (channelList.isEmpty()) { - for (DeviceChannel channel : channels) { + String now = DateUtil.getNow(); + for (DeviceChannel channel : uniqueChannels) { channel.setDataDeviceId(device.getId()); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { channel.setStreamId(inviteInfo.getStreamInfo().getStream()); } - String now = DateUtil.getNow(); channel.setUpdateTime(now); channel.setCreateTime(now); - addChannels.add(channel); + upsertChannels.add(channel); } - }else { + } else { + HashMap channelsInStore = new HashMap<>(); for (DeviceChannel deviceChannel : channelList) { channelsInStore.put(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId(), deviceChannel); } - for (DeviceChannel channel : channels) { + String now = DateUtil.getNow(); + for (DeviceChannel channel : uniqueChannels) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { channel.setStreamId(inviteInfo.getStreamInfo().getStream()); } - String now = DateUtil.getNow(); channel.setUpdateTime(now); DeviceChannel deviceChannelInDb = channelsInStore.get(channel.getDataDeviceId() + channel.getDeviceId()); - if ( deviceChannelInDb != null) { + if (deviceChannelInDb != null) { channel.setId(deviceChannelInDb.getId()); - channel.setUpdateTime(now); - updateChannels.add(channel); - }else { + channel.setCreateTime(deviceChannelInDb.getCreateTime()); + } else { channel.setCreateTime(now); - channel.setUpdateTime(now); - addChannels.add(channel); } + upsertChannels.add(channel); } } - Set channelSet = new HashSet<>(); - // 滤重 - List addChannelList = new ArrayList<>(); - List updateChannelList = new ArrayList<>(); - addChannels.forEach(channel -> { - if (channelSet.add(channel.getDeviceId())) { - addChannelList.add(channel); - } - }); - channelSet.clear(); - updateChannels.forEach(channel -> { - if (channelSet.add(channel.getDeviceId())) { - updateChannelList.add(channel); - } - }); int limitCount = 500; - if (!addChannelList.isEmpty()) { - for (int i = 0; i < addChannelList.size(); i += limitCount) { - int end = Math.min(i + limitCount, addChannelList.size()); - List batchList = addChannelList.subList(i, end); - result += channelMapper.batchAdd(batchList); - } - } - if (!updateChannelList.isEmpty()) { - for (int i = 0; i < updateChannelList.size(); i += limitCount) { - int end = Math.min(i + limitCount, updateChannelList.size()); - List batchList = updateChannelList.subList(i, end); - result += channelMapper.batchUpdate(batchList); + if (!upsertChannels.isEmpty()) { + for (int i = 0; i < upsertChannels.size(); i += limitCount) { + int end = Math.min(i + limitCount, upsertChannels.size()); + List batchList = upsertChannels.subList(i, end); + result += channelMapper.batchUpsert(batchList); } } return result; @@ -385,21 +368,26 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { allChannelMap.put(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId(), deviceChannel); } } - // 数据去重 - List channels = new ArrayList<>(); + // 入参去重 + Set dedupSet = new HashSet<>(); + List uniqueChannels = new ArrayList<>(); + for (DeviceChannel ch : deviceChannelList) { + if (dedupSet.add(ch.getDeviceId())) { + uniqueChannels.add(ch); + } + } - List updateChannels = new ArrayList<>(); - List addChannels = new ArrayList<>(); + List upsertChannels = new ArrayList<>(); List deleteChannels = new ArrayList<>(); - StringBuilder stringBuilder = new StringBuilder(); Map subContMap = new HashMap<>(); - for (DeviceChannel deviceChannel : deviceChannelList) { + for (DeviceChannel deviceChannel : uniqueChannels) { DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId()); if (channelInDb != null) { deviceChannel.setStreamId(channelInDb.getStreamId()); deviceChannel.setHasAudio(channelInDb.isHasAudio()); deviceChannel.setId(channelInDb.getId()); + deviceChannel.setCreateTime(channelInDb.getCreateTime()); if (channelInDb.getStatus() != null && !channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){ List platformList = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getDeviceId()); if (!CollectionUtils.isEmpty(platformList)){ @@ -409,60 +397,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } } deviceChannel.setUpdateTime(DateUtil.getNow()); - updateChannels.add(deviceChannel); - }else { + } else { deviceChannel.setCreateTime(DateUtil.getNow()); deviceChannel.setUpdateTime(DateUtil.getNow()); - addChannels.add(deviceChannel); } allChannelMap.remove(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId()); - channels.add(deviceChannel); + upsertChannels.add(deviceChannel); if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) { if (subContMap.get(deviceChannel.getParentId()) == null) { subContMap.put(deviceChannel.getParentId(), 1); - }else { + } else { Integer count = subContMap.get(deviceChannel.getParentId()); subContMap.put(deviceChannel.getParentId(), count++); } } } deleteChannels.addAll(allChannelMap.values()); - if (!channels.isEmpty()) { - for (DeviceChannel channel : channels) { - if (subContMap.get(channel.getDeviceId()) != null){ - Integer count = subContMap.get(channel.getDeviceId()); - if (count > 0) { - channel.setSubCount(count); - channel.setParental(1); - } + + for (DeviceChannel channel : upsertChannels) { + if (subContMap.get(channel.getDeviceId()) != null){ + Integer count = subContMap.get(channel.getDeviceId()); + if (count > 0) { + channel.setSubCount(count); + channel.setParental(1); } } } - if (stringBuilder.length() > 0) { - log.info("[目录查询]收到的数据存在重复: {}" , stringBuilder); - } - if(CollectionUtils.isEmpty(channels)){ + if(CollectionUtils.isEmpty(upsertChannels)){ log.info("通道重设,数据为空={}" , deviceChannelList); return false; } int limitCount = 500; - if (!addChannels.isEmpty()) { - for (int i = 0; i < addChannels.size(); i += limitCount) { - int end = Math.min(i + limitCount, addChannels.size()); - List batchList = addChannels.subList(i, end); - channelMapper.batchAdd(batchList); + if (!upsertChannels.isEmpty()) { + for (int i = 0; i < upsertChannels.size(); i += limitCount) { + int end = Math.min(i + limitCount, upsertChannels.size()); + List batchList = upsertChannels.subList(i, end); + channelMapper.batchUpsert(batchList); } } - if (!updateChannels.isEmpty()) { - for (int i = 0; i < updateChannels.size(); i += limitCount) { - int end = Math.min(i + limitCount, updateChannels.size()); - List batchList = updateChannels.subList(i, end); - channelMapper.batchUpdate(batchList); - } - // 不对收到的通道做比较,已确定是否真的发生变化,所以不发送更新通知 - - } if (!deleteChannels.isEmpty()) { try { // 这些通道可能关联了,上级平台需要删除同时发送消息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 6a2ec5847..d7ac5ded2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -325,13 +325,13 @@ public class GbChannelServiceImpl implements IGbChannelService { log.warn("[新增多个通道] 通道数量为0,更新失败"); return; } - // 批量保存 + // 批量保存(使用UPSERT防重复) int limitCount = 1000; int result = 0; for (int i = 0; i < commonGBChannels.size(); i += limitCount) { int end = Math.min(i + limitCount, commonGBChannels.size()); List batchList = commonGBChannels.subList(i, end); - result += commonGBChannelMapper.batchAdd(batchList); + result += commonGBChannelMapper.batchUpsert(batchList); } try { // 发送catalog diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java index 08d3e8903..451467492 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @Component @@ -38,6 +39,16 @@ public class CatalogDataManager{ private final Map dataMap = new ConcurrentHashMap<>(); + private final Map deviceWriteLocks = new ConcurrentHashMap<>(); + + public ReentrantLock getDeviceWriteLock(String deviceId) { + return deviceWriteLocks.computeIfAbsent(deviceId, k -> new ReentrantLock()); + } + + public void removeDeviceWriteLock(String deviceId) { + deviceWriteLocks.remove(deviceId); + } + private final String key = "VMP_CATALOG_DATA"; public String buildMapKey(String deviceId, int sn ) { @@ -240,12 +251,17 @@ public class CatalogDataManager{ }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { if ( catalogData.getTime().isBefore(instantBefore5S)) { String deviceId = catalogData.getDevice().getDeviceId(); - int sn = catalogData.getSn(); - List deviceChannelList = getDeviceChannelList(deviceId, sn); + ReentrantLock lock = getDeviceWriteLock(deviceId); + if (!lock.tryLock()) { + // saveData() 正在执行,跳过本次,等下一个5s周期 + continue; + } try { + int sn = catalogData.getSn(); + List deviceChannelList = getDeviceChannelList(deviceId, sn); if (catalogData.getTotal() == deviceChannelList.size()) { deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList); - }else { + } else { deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList); } List regionList = getRegionList(deviceId, sn); @@ -256,16 +272,23 @@ public class CatalogDataManager{ if (groupList != null && !groupList.isEmpty()) { groupService.batchAdd(groupList); } - }catch (Exception e) { + String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "条"; + catalogData.setErrorMsg(errorMsg); + } catch (Exception e) { log.error("[国标通道同步] 入库失败: ", e); + } finally { + lock.unlock(); } - String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "条"; - catalogData.setErrorMsg(errorMsg); catalogData.setStatus(CatalogData.CatalogDataStatus.end); } }else { if (catalogData.getTime().isBefore(instantBefore30S)) { + String deviceId = catalogData.getDevice().getDeviceId(); dataMap.remove(dataKey); + // 清理可能残留的设备锁 + if (deviceWriteLocks.containsKey(deviceId)) { + deviceWriteLocks.remove(deviceId); + } Set redisKeysForChannel = catalogData.getRedisKeysForChannel(); if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) { for (String deleteKey : redisKeysForChannel) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 6533c4e59..417245d5f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; import java.lang.Thread; /** @@ -163,16 +164,22 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp String deviceId = device.getDeviceId(); if (catalogDataCatch.size(deviceId, sn) > 0 && catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) { - // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, + // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, // 目前支持设备通道上线通知时和设备上线时向上级通知 int finalSn = sn; Thread.startVirtualThread(() -> { - boolean resetChannelsResult = saveData(device, finalSn); - if (!resetChannelsResult) { - String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "条"; - catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg); - } else { - catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null); + ReentrantLock lock = catalogDataCatch.getDeviceWriteLock(device.getDeviceId()); + lock.lock(); + try { + boolean resetChannelsResult = saveData(device, finalSn); + if (!resetChannelsResult) { + String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "条"; + catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg); + } else { + catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null); + } + } finally { + lock.unlock(); } }); } diff --git a/数据库/2.7.4-h2/h2-schema.sql b/数据库/2.7.4-h2/h2-schema.sql index f1bef6daf..f9226024c 100644 --- a/数据库/2.7.4-h2/h2-schema.sql +++ b/数据库/2.7.4-h2/h2-schema.sql @@ -155,7 +155,8 @@ create table IF NOT EXISTS wvp_device_channel gps_speed double precision, gps_altitude double precision, gps_direction double precision, - constraint uk_wvp_unique_channel unique (gb_device_id) + constraint uk_wvp_unique_channel unique (gb_device_id), + constraint uk_device_channel_source unique (data_device_id, device_id) ); create table IF NOT EXISTS wvp_media_server diff --git a/数据库/2.7.4/初始化-mysql-2.7.4.sql b/数据库/2.7.4/初始化-mysql-2.7.4.sql index bfce3678c..acc15d504 100644 --- a/数据库/2.7.4/初始化-mysql-2.7.4.sql +++ b/数据库/2.7.4/初始化-mysql-2.7.4.sql @@ -162,7 +162,8 @@ create table IF NOT EXISTS wvp_device_channel enable_broadcast integer default 0 COMMENT '是否支持广播', index (data_type), index (data_device_id), - constraint uk_wvp_unique_channel unique (gb_device_id) + constraint uk_wvp_unique_channel unique (gb_device_id), + constraint uk_device_channel_source unique (data_device_id, device_id) ); -- 媒体服务器(如 ZLM)节点信息 diff --git a/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql b/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql index b34b04d9a..eba8e3b10 100644 --- a/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql +++ b/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql @@ -215,7 +215,8 @@ create table IF NOT EXISTS wvp_device_channel gps_altitude double precision, gps_direction double precision, enable_broadcast integer default 0, - constraint uk_wvp_unique_channel unique (gb_device_id) + constraint uk_wvp_unique_channel unique (gb_device_id), + constraint uk_device_channel_source unique (data_device_id, device_id) ); COMMENT ON TABLE wvp_device_channel IS '保存设备下的通道信息以及扩展属性'; COMMENT ON COLUMN wvp_device_channel.id IS '主键ID'; diff --git a/数据库/2.7.4/更新-mysql-2.7.4.sql b/数据库/2.7.4/更新-mysql-2.7.4.sql index 645c97021..ace5bc0be 100644 --- a/数据库/2.7.4/更新-mysql-2.7.4.sql +++ b/数据库/2.7.4/更新-mysql-2.7.4.sql @@ -196,7 +196,29 @@ call wvp_20260417(); DROP PROCEDURE wvp_20260417; DELIMITER ; - +/* +* 20260521 添加wvp_device_channel唯一约束,防止通道重复写入 +*/ +DELIMITER // +CREATE PROCEDURE `wvp_20260521`() +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS + WHERE TABLE_SCHEMA = (SELECT DATABASE()) + AND TABLE_NAME = 'wvp_device_channel' + AND INDEX_NAME = 'uk_device_channel_source') + THEN + -- 先清理可能的重复数据 + DELETE t1 FROM wvp_device_channel t1 + INNER JOIN wvp_device_channel t2 + WHERE t1.id < t2.id + AND t1.data_device_id = t2.data_device_id + AND t1.device_id = t2.device_id; +ALTER TABLE wvp_device_channel ADD UNIQUE INDEX uk_device_channel_source (data_device_id, device_id); +END IF; +END; // +call wvp_20260521(); +DROP PROCEDURE wvp_20260521; +DELIMITER ; diff --git a/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql b/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql index 0ab03ae64..e99a47039 100644 --- a/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql +++ b/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql @@ -68,6 +68,26 @@ COMMENT ON COLUMN wvp_alarm.alarm_type IS '报警类别'; COMMENT ON COLUMN wvp_alarm.alarm_time IS '报警时间'; +/* +* 20260521 添加wvp_device_channel唯一约束,防止通道重复写入 +*/ +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_indexes + WHERE tablename = 'wvp_device_channel' + AND indexname = 'uk_device_channel_source') THEN + -- 先清理可能的重复数据 + DELETE FROM wvp_device_channel t1 + USING wvp_device_channel t2 + WHERE t1.id < t2.id + AND t1.data_device_id = t2.data_device_id + AND t1.device_id = t2.device_id; + ALTER TABLE wvp_device_channel ADD CONSTRAINT uk_device_channel_source UNIQUE (data_device_id, device_id); + END IF; +END; +$$; + + /* * 20260417 将 wvp_device_mobile_position从专属国标的位置记录表,改为通用通道共用的位置记录表 */