优化目录刷新写入逻辑

This commit is contained in:
lin 2026-06-08 12:39:52 +08:00
parent cc80287124
commit 9aca5aab35
6 changed files with 76 additions and 21 deletions

View File

@ -551,7 +551,7 @@ public interface PlatformChannelMapper {
@Select("<script>" +
" select " +
" wpgc.platform_id as platform_id" +
" wpgc.platform_id as platform_id,\n" +
" wdc.id as gb_id,\n" +
" wdc.data_type,\n" +
" wdc.data_device_id,\n" +

View File

@ -40,6 +40,8 @@ public class CatalogDataManager{
private final Map<String, CatalogData> dataMap = new ConcurrentHashMap<>();
private final Set<String> syncingDevices = ConcurrentHashMap.newKeySet();
private final Map<String, ReentrantLock> deviceWriteLocks = new ConcurrentHashMap<>();
public ReentrantLock getDeviceWriteLock(String deviceId) {
@ -73,6 +75,7 @@ public class CatalogDataManager{
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
catalogData.setTime(Instant.now());
dataMap.put(buildMapKey(device.getDeviceId(),sn), catalogData);
syncingDevices.add(device.getDeviceId());
}
private void deleteRedisKeys(CatalogData catalogData) {
@ -253,6 +256,7 @@ public class CatalogDataManager{
String errorMsg = "同步失败,等待回复超时";
catalogData.setErrorMsg(errorMsg);
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
syncingDevices.remove(catalogData.getDevice().getDeviceId());
}
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
boolean complete = catalogData.isComplete();
@ -285,6 +289,7 @@ public class CatalogDataManager{
lock.unlock();
}
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
syncingDevices.remove(deviceId);
}
}else {
if (catalogData.getTime().isBefore(instantBefore30S)) {
@ -294,6 +299,7 @@ public class CatalogDataManager{
if (deviceWriteLocks.containsKey(deviceId)) {
deviceWriteLocks.remove(deviceId);
}
syncingDevices.remove(deviceId);
deleteRedisKeys(catalogData);
}
}
@ -317,6 +323,16 @@ public class CatalogDataManager{
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
catalogData.setErrorMsg(errorMsg);
catalogData.setTime(Instant.now());
syncingDevices.remove(deviceId);
}
public boolean isSyncing(String deviceId) {
return syncingDevices.contains(deviceId);
}
public boolean isEnd(String deviceId, int sn) {
CatalogData catalogData = dataMap.get(buildMapKey(deviceId, sn));
return catalogData != null && catalogData.getStatus() == CatalogData.CatalogDataStatus.end;
}
public int size(String deviceId, int sn) {

View File

@ -174,8 +174,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 点播成功 TODO 可以在此处检测cancel命令是否存在存在则不发送
if (userSetting.getUseCustomSsrcForParentInvite()) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
finalInviteInfo.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) ? "0" : "1"));
String sendSsrc = sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) ? "0" : "1");
finalInviteInfo.setSsrc(sendSsrc);
}
// 构建sendRTP内容
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(),

View File

@ -32,7 +32,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
/**
* SIP命令类型 NOTIFY请求中的目录请求处理
@ -294,10 +293,11 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
for (Map.Entry<String, List<NotifyCatalogChannel>> entry : grouped.entrySet()) {
ReentrantLock lock = catalogDataManager.getDeviceWriteLock(entry.getKey());
lock.lock();
try {
for (NotifyCatalogChannel notifyCatalogChannel : entry.getValue()) {
if (catalogDataManager.isSyncing(entry.getKey())) {
log.info("[NOTIFY] 设备 {} 正在同步中,跳过本次订阅通知", entry.getKey());
continue;
}
for (NotifyCatalogChannel notifyCatalogChannel : entry.getValue()) {
try {
switch (notifyCatalogChannel.getType()) {
case STATUS_CHANGED:
@ -330,9 +330,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.error("[存储收到的通道-异常]类型:{},编号:{}", notifyCatalogChannel.getType(),
notifyCatalogChannel.getChannel().getDeviceId(), e);
}
}
} finally {
lock.unlock();
}
}
}

View File

@ -3,12 +3,16 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.gb28181.service.IRegionService;
import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.utils.Coordtransform;
import gov.nist.javax.sip.message.SIPRequest;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@ -44,6 +48,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
@Autowired
private CatalogDataManager catalogDataCatch;
@Autowired
private IRegionService regionService;
@Autowired
private IGroupService groupService;
@Autowired
private SipConfig sipConfig;
@ -144,7 +154,34 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
if (catalogDataCatch.size(device.getDeviceId(), sn) > 0
&& catalogDataCatch.size(device.getDeviceId(), sn) == catalogDataCatch.sumNum(device.getDeviceId(), sn)) {
catalogDataCatch.setComplete(device.getDeviceId(), sn);
ReentrantLock lock = catalogDataCatch.getDeviceWriteLock(device.getDeviceId());
if (!lock.tryLock()) {
log.info("[同步通道] 设备 {} 正在入库中,跳过重复写入", device.getDeviceId());
return;
}
try {
if (catalogDataCatch.isEnd(device.getDeviceId(), sn)) {
return;
}
List<DeviceChannel> channels = catalogDataCatch.getDeviceChannelList(device.getDeviceId(), sn);
if (!channels.isEmpty()) {
deviceChannelService.resetChannels(device.getId(), channels);
}
List<Region> regions = catalogDataCatch.getRegionList(device.getDeviceId(), sn);
if (regions != null && !regions.isEmpty()) {
regionService.batchAdd(regions);
}
List<Group> groups = catalogDataCatch.getGroupList(device.getDeviceId(), sn);
if (groups != null && !groups.isEmpty()) {
groupService.batchAdd(groups);
}
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), sn, null);
} catch (Exception e) {
log.warn("[同步通道] 直接入库失败,交由定时器兜底", e);
catalogDataCatch.setComplete(device.getDeviceId(), sn);
} finally {
lock.unlock();
}
}
}
}

View File

@ -202,23 +202,27 @@ DELIMITER ;
DELIMITER //
CREATE PROCEDURE `wvp_20260521`()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
IF NOT EXISTS (SELECT 1
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = (SELECT DATABASE())
AND TABLE_NAME = 'wvp_device_channel'
AND INDEX_NAME = 'uk_device_channel_source')
THEN
-- 先清理可能的重复数据
DELETE t1 FROM wvp_device_channel t1
INNER JOIN wvp_device_channel t2
WHERE t1.id < t2.id
AND t1.data_device_id = t2.data_device_id
AND t1.device_id = t2.device_id;
ALTER TABLE wvp_device_channel ADD UNIQUE INDEX uk_device_channel_source (data_device_id, device_id);
-- 用 GROUP BY + LEFT JOIN 替代自连接 DELETE
DELETE t1
FROM wvp_device_channel t1
LEFT JOIN (SELECT MAX(id) as id
FROM wvp_device_channel
GROUP BY data_device_id, device_id) t2 ON t1.id = t2.id
WHERE t2.id IS NULL;
ALTER TABLE wvp_device_channel
ADD UNIQUE INDEX uk_device_channel_source (data_device_id, device_id);
END IF;
END; //
DELIMITER ;
call wvp_20260521();
DROP PROCEDURE wvp_20260521;
DELIMITER ;