修复未开启行政区划和分组推送的上级平台收到行政区划和目录的BUG

This commit is contained in:
lin 2026-06-08 09:54:30 +08:00
parent 27285f321c
commit cc80287124
12 changed files with 155 additions and 362 deletions

View File

@ -16,12 +16,12 @@ public class AlarmChannelMessage {
/** /**
* 报警编号 * 报警编号
*/ */
private int alarmSn; private Integer alarmSn;
/** /**
* 告警类型 * 告警类型
*/ */
private int alarmType; private Integer alarmType;
/** /**
* 报警描述 * 报警描述

View File

@ -19,8 +19,8 @@ public class CatalogData {
private Instant time; private Instant time;
private Device device; private Device device;
private String errorMsg; private String errorMsg;
private boolean complete;
private Set<String> redisKeysForChannel = new HashSet<>(); private Set<String> redisKeysForChannel = new HashSet<>();
private Set<String> errorChannel = new HashSet<>();
private Set<String> redisKeysForRegion = new HashSet<>(); private Set<String> redisKeysForRegion = new HashSet<>();
private Set<String> redisKeysForGroup = new HashSet<>(); private Set<String> redisKeysForGroup = new HashSet<>();

View File

@ -1,44 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 码流索引标识
*/
public enum GbSteamIdentification {
/**
* 主码流 stream:0
* 子码流 stream:1s
*/
streamMain("stream", new String[]{"0","1"}),
/**
* 国标28181-2022定义的方式
* 主码流 streamnumber:0
* 子码流 streamnumber:1
*/
streamnumber("streamnumber", new String[]{"0","1"}),
/**
* 主码流 streamprofile:0
* 子码流 streamprofile:1
*/
streamprofile("streamprofile", new String[]{"0","1"}),
/**
* 适用的品牌 TP-LINK
*/
streamMode("streamMode", new String[]{"main","sub"}),
;
GbSteamIdentification(String value, String[] indexArray) {
this.value = value;
this.indexArray = indexArray;
}
private String value;
private String[] indexArray;
public String getValue() {
return value;
}
public String[] getIndexArray() {
return indexArray;
}
}

View File

@ -7,16 +7,19 @@ public class NotifyCatalogChannel {
private DeviceChannel channel; private DeviceChannel channel;
private String deviceId;
public enum Type { public enum Type {
ADD, DELETE, UPDATE, STATUS_CHANGED ADD, DELETE, UPDATE, STATUS_CHANGED
} }
public static NotifyCatalogChannel getInstance(Type type, DeviceChannel channel) { public static NotifyCatalogChannel getInstance(Type type, DeviceChannel channel, String deviceId) {
NotifyCatalogChannel notifyCatalogChannel = new NotifyCatalogChannel(); NotifyCatalogChannel notifyCatalogChannel = new NotifyCatalogChannel();
notifyCatalogChannel.setType(type); notifyCatalogChannel.setType(type);
notifyCatalogChannel.setChannel(channel); notifyCatalogChannel.setChannel(channel);
notifyCatalogChannel.setDeviceId(deviceId);
return notifyCatalogChannel; return notifyCatalogChannel;
} }
@ -35,4 +38,12 @@ public class NotifyCatalogChannel {
public void setChannel(DeviceChannel channel) { public void setChannel(DeviceChannel channel) {
this.channel = channel; this.channel = channel;
} }
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
} }

View File

@ -1,116 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* 国标级联-目录
* @author lin
*/
@Schema(description = "目录信息")
public class PlatformCatalog {
@Schema(description = "ID")
private String id;
@Schema(description = "名称")
private String name;
@Schema(description = "平台ID")
private String platformId;
@Schema(description = "父级目录ID")
private String parentId;
@Schema(description = "行政区划")
private String civilCode;
@Schema(description = "目录分组")
private String businessGroupId;
/**
* 子节点数
*/
@Schema(description = "子节点数")
private int childrenCount;
/**
* 0 目录, 1 国标通道, 2 直播流
*/
@Schema(description = "类型0 目录, 1 国标通道, 2 直播流")
private int type;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPlatformId() {
return platformId;
}
public void setPlatformId(String platformId) {
this.platformId = platformId;
}
public String getParentId() {
return parentId;
}
public void setParentId(String parentId) {
this.parentId = parentId;
}
public int getChildrenCount() {
return childrenCount;
}
public void setChildrenCount(int childrenCount) {
this.childrenCount = childrenCount;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public void setTypeForCatalog() {
this.type = 0;
}
public void setTypeForGb() {
this.type = 1;
}
public void setTypeForStream() {
this.type = 2;
}
public String getCivilCode() {
return civilCode;
}
public void setCivilCode(String civilCode) {
this.civilCode = civilCode;
}
public String getBusinessGroupId() {
return businessGroupId;
}
public void setBusinessGroupId(String businessGroupId) {
this.businessGroupId = businessGroupId;
}
}

View File

@ -1,24 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
import lombok.Data;
@Data
public class PlatformCatch {
private String id;
/**
* 心跳未回复次数
*/
private int keepAliveReply;
// 注册未回复次数
private int registerAliveReply;
private String callId;
private Platform platform;
private SipTransactionInfo sipTransactionInfo;
}

View File

@ -404,12 +404,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
allChannelMap.remove(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId()); allChannelMap.remove(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId());
upsertChannels.add(deviceChannel); upsertChannels.add(deviceChannel);
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) { if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
if (subContMap.get(deviceChannel.getParentId()) == null) { subContMap.merge(deviceChannel.getParentId(), 1, Integer::sum);
subContMap.put(deviceChannel.getParentId(), 1);
} else {
Integer count = subContMap.get(deviceChannel.getParentId());
subContMap.put(deviceChannel.getParentId(), count++);
}
} }
} }
deleteChannels.addAll(allChannelMap.values()); deleteChannels.addAll(allChannelMap.values());

View File

@ -750,13 +750,16 @@ public class DeviceServiceImpl implements IDeviceService {
@Override @Override
public void sync(Device device) { public void sync(Device device) {
int sn;
synchronized (device.getDeviceId().intern()) {
if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) { if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
SyncStatus syncStatus = catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()); SyncStatus syncStatus = catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId());
log.info("[同步通道] 同步已存在, 设备: {}, 同步信息: {}", device.getDeviceId(), JSON.toJSON(syncStatus)); log.info("[同步通道] 同步已存在, 设备: {}, 同步信息: {}", device.getDeviceId(), JSON.toJSON(syncStatus));
return; return;
} }
int sn = (int)((Math.random()*9+1)*100000); sn = (int)((Math.random()*9+1)*100000);
catalogResponseMessageHandler.setChannelSyncReady(device, sn); catalogResponseMessageHandler.setChannelSyncReady(device, sn);
}
try { try {
sipCommander.catalogQuery(device, sn, event -> { sipCommander.catalogQuery(device, sn, event -> {
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);

View File

@ -393,17 +393,21 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
int result = platformChannelMapper.addChannels(platformId, channelList); int result = platformChannelMapper.addChannels(platformId, channelList);
if (result > 0) { if (result > 0) {
// 查询通道相关的行政区划信息是否共享如果没共享就添加 // 查询通道相关的行政区划信息是否共享如果没共享就添加
// 判断平台是否客气了推送行政区划
if (platform.getCatalogWithRegion() != 0) {
Set<Region> regionListNotShare = getRegionNotShareByChannelList(channelList, platformId); Set<Region> regionListNotShare = getRegionNotShareByChannelList(channelList, platformId);
if (!regionListNotShare.isEmpty()) { if (!regionListNotShare.isEmpty()) {
int addGroupResult = platformChannelMapper.addPlatformRegion(new ArrayList<>(regionListNotShare), platformId); int addGroupResult = platformChannelMapper.addPlatformRegion(new ArrayList<>(regionListNotShare), platformId);
if (addGroupResult > 0) { if (addGroupResult > 0) {
for (Region region : regionListNotShare) { for (Region region : regionListNotShare) {
// 分组信息排序时需要将顶层排在最后 // 分组信息排序时需要将顶层排在最后
channelList.add(0, CommonGBChannel.build(region)); channelList.addFirst(CommonGBChannel.build(region));
}
} }
} }
} }
if (platform.getCatalogWithGroup() != 0) {
// 查询通道相关的分组信息是否共享如果没共享就添加 // 查询通道相关的分组信息是否共享如果没共享就添加
Set<Group> groupListNotShare = getGroupNotShareByChannelList(channelList, platformId); Set<Group> groupListNotShare = getGroupNotShareByChannelList(channelList, platformId);
if (!groupListNotShare.isEmpty()) { if (!groupListNotShare.isEmpty()) {
@ -411,7 +415,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
if (addGroupResult > 0) { if (addGroupResult > 0) {
for (Group group : groupListNotShare) { for (Group group : groupListNotShare) {
// 分组信息排序时需要将顶层排在最后 // 分组信息排序时需要将顶层排在最后
channelList.add(0, CommonGBChannel.build(group)); channelList.addFirst(CommonGBChannel.build(group));
}
} }
} }
} }

View File

@ -16,6 +16,7 @@ import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -56,8 +57,25 @@ public class CatalogDataManager{
} }
public void addReady(Device device, int sn ) { public void addReady(Device device, int sn ) {
CatalogData catalogData = dataMap.get(buildMapKey(device.getDeviceId(),sn)); // 清除该设备的所有旧条目
if (catalogData != null) { Iterator<Map.Entry<String, CatalogData>> it = dataMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, CatalogData> entry = it.next();
CatalogData old = entry.getValue();
if (old != null && device.getDeviceId().equals(old.getDevice().getDeviceId())) {
deleteRedisKeys(old);
it.remove();
}
}
CatalogData catalogData = new CatalogData();
catalogData.setDevice(device);
catalogData.setSn(sn);
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
catalogData.setTime(Instant.now());
dataMap.put(buildMapKey(device.getDeviceId(),sn), catalogData);
}
private void deleteRedisKeys(CatalogData catalogData) {
Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel(); Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) { if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
for (String deleteKey : redisKeysForChannel) { for (String deleteKey : redisKeysForChannel) {
@ -76,14 +94,6 @@ public class CatalogDataManager{
redisTemplate.opsForHash().delete(key, deleteKey); redisTemplate.opsForHash().delete(key, deleteKey);
} }
} }
dataMap.remove(buildMapKey(device.getDeviceId(),sn));
}
catalogData = new CatalogData();
catalogData.setDevice(device);
catalogData.setSn(sn);
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
catalogData.setTime(Instant.now());
dataMap.put(buildMapKey(device.getDeviceId(),sn), catalogData);
} }
public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList, public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList,
@ -188,10 +198,6 @@ public class CatalogDataManager{
}else { }else {
syncStatus.setSyncIng(true); syncStatus.setSyncIng(true);
} }
if (catalogData.getErrorMsg() != null) {
// 失败的同步信息,返回一次后直接移除
dataMap.remove(key);
}
return syncStatus; return syncStatus;
} }
} }
@ -249,20 +255,19 @@ public class CatalogDataManager{
catalogData.setStatus(CatalogData.CatalogDataStatus.end); catalogData.setStatus(CatalogData.CatalogDataStatus.end);
} }
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
if ( catalogData.getTime().isBefore(instantBefore5S)) { boolean complete = catalogData.isComplete();
boolean timeout = catalogData.getTime().isBefore(instantBefore5S);
if (complete || timeout) {
String deviceId = catalogData.getDevice().getDeviceId(); String deviceId = catalogData.getDevice().getDeviceId();
ReentrantLock lock = getDeviceWriteLock(deviceId); ReentrantLock lock = getDeviceWriteLock(deviceId);
if (!lock.tryLock()) { if (!lock.tryLock()) {
// saveData() 正在执行跳过本次等下一个5s周期
continue; continue;
} }
try { try {
int sn = catalogData.getSn(); int sn = catalogData.getSn();
List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId, sn); List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId, sn);
if (catalogData.getTotal() == deviceChannelList.size()) { if (!deviceChannelList.isEmpty()) {
deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList); deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList);
} else {
deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList);
} }
List<Region> regionList = getRegionList(deviceId, sn); List<Region> regionList = getRegionList(deviceId, sn);
if ( regionList!= null && !regionList.isEmpty()) { if ( regionList!= null && !regionList.isEmpty()) {
@ -272,10 +277,10 @@ public class CatalogDataManager{
if (groupList != null && !groupList.isEmpty()) { if (groupList != null && !groupList.isEmpty()) {
groupService.batchAdd(groupList); groupService.batchAdd(groupList);
} }
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + ""; catalogData.setErrorMsg(null);
catalogData.setErrorMsg(errorMsg);
} catch (Exception e) { } catch (Exception e) {
log.error("[国标通道同步] 入库失败: ", e); log.error("[国标通道同步] 入库失败: ", e);
catalogData.setErrorMsg("入库失败: " + e.getMessage());
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -289,30 +294,21 @@ public class CatalogDataManager{
if (deviceWriteLocks.containsKey(deviceId)) { if (deviceWriteLocks.containsKey(deviceId)) {
deviceWriteLocks.remove(deviceId); deviceWriteLocks.remove(deviceId);
} }
Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel(); deleteRedisKeys(catalogData);
if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
for (String deleteKey : redisKeysForChannel) {
redisTemplate.opsForHash().delete(key, deleteKey);
}
}
Set<String> redisKeysForRegion = catalogData.getRedisKeysForRegion();
if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) {
for (String deleteKey : redisKeysForRegion) {
redisTemplate.opsForHash().delete(key, deleteKey);
}
}
Set<String> redisKeysForGroup = catalogData.getRedisKeysForGroup();
if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) {
for (String deleteKey : redisKeysForGroup) {
redisTemplate.opsForHash().delete(key, deleteKey);
}
}
} }
} }
} }
} }
public void setComplete(String deviceId, int sn) {
CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
if (catalogData == null) {
return;
}
catalogData.setComplete(true);
}
public void setChannelSyncEnd(String deviceId, int sn, String errorMsg) { public void setChannelSyncEnd(String deviceId, int sn, String errorMsg) {
CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
if (catalogData == null) { if (catalogData == null) {
@ -328,7 +324,7 @@ public class CatalogDataManager{
if (catalogData == null) { if (catalogData == null) {
return 0; return 0;
} }
return catalogData.getRedisKeysForChannel().size() + catalogData.getErrorChannel().size(); return catalogData.getRedisKeysForChannel().size();
} }
public int sumNum(String deviceId, int sn) { public int sumNum(String deviceId, int sn) {

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -26,9 +27,12 @@ import javax.sip.header.FromHeader;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* SIP命令类型 NOTIFY请求中的目录请求处理 * SIP命令类型 NOTIFY请求中的目录请求处理
@ -56,6 +60,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
@Autowired @Autowired
private IGbChannelService channelService; private IGbChannelService channelService;
@Autowired
private CatalogDataManager catalogDataManager;
public void process(RequestEvent evt) { public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
@ -149,7 +156,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 上线 // 上线
log.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); log.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
channel.setStatus("ON"); channel.setStatus("ON");
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
@ -163,7 +170,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); log.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else { } else {
channel.setStatus("OFF"); channel.setStatus("OFF");
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@ -177,7 +184,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); log.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else { } else {
channel.setStatus("OFF"); channel.setStatus("OFF");
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
@ -192,7 +199,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); log.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else { } else {
channel.setStatus("OFF"); channel.setStatus("OFF");
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
@ -210,12 +217,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channel.setId(deviceChannel.getId()); channel.setId(deviceChannel.getId());
channel.setHasAudio(deviceChannel.isHasAudio()); channel.setHasAudio(deviceChannel.isHasAudio());
channel.setUpdateTime(DateUtil.getNow()); channel.setUpdateTime(DateUtil.getNow());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel, device.getDeviceId()));
} else { } else {
catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow()); catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow());
catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow()); catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
@ -227,7 +234,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
case CatalogEvent.DEL: case CatalogEvent.DEL:
// 删除 // 删除
log.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); log.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
@ -244,12 +251,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channel.setHasAudio(deviceChannelForUpdate.isHasAudio()); channel.setHasAudio(deviceChannelForUpdate.isHasAudio());
channel.setUpdateTime(DateUtil.getNow()); channel.setUpdateTime(DateUtil.getNow());
channel.setUpdateTime(DateUtil.getNow()); channel.setUpdateTime(DateUtil.getNow());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel, device.getDeviceId()));
} else { } else {
catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow()); catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow());
catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow()); catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel)); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
@ -281,7 +288,16 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channelListForSave.add(channelList.poll()); channelListForSave.add(channelList.poll());
} }
for (NotifyCatalogChannel notifyCatalogChannel : channelListForSave) { Map<String, List<NotifyCatalogChannel>> grouped = new HashMap<>();
for (NotifyCatalogChannel item : channelListForSave) {
grouped.computeIfAbsent(item.getDeviceId(), k -> new ArrayList<>()).add(item);
}
for (Map.Entry<String, List<NotifyCatalogChannel>> entry : grouped.entrySet()) {
ReentrantLock lock = catalogDataManager.getDeviceWriteLock(entry.getKey());
lock.lock();
try {
for (NotifyCatalogChannel notifyCatalogChannel : entry.getValue()) {
try { try {
switch (notifyCatalogChannel.getType()) { switch (notifyCatalogChannel.getType()) {
case STATUS_CHANGED: case STATUS_CHANGED:
@ -315,5 +331,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
notifyCatalogChannel.getChannel().getDeviceId(), e); notifyCatalogChannel.getChannel().getDeviceId(), e);
} }
} }
} finally {
lock.unlock();
}
}
} }
} }

View File

@ -3,8 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.gb28181.service.IRegionService;
import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager; import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
@ -17,7 +15,6 @@ import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
@ -28,9 +25,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.Thread;
/** /**
* 目录查询的回复 * 目录查询的回复
@ -44,17 +38,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
@Autowired @Autowired
private ResponseMessageHandler responseMessageHandler; private ResponseMessageHandler responseMessageHandler;
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Autowired @Autowired
private IDeviceChannelService deviceChannelService; private IDeviceChannelService deviceChannelService;
@Autowired
private IRegionService regionService;
@Autowired
private IGroupService groupService;
@Autowired @Autowired
private CatalogDataManager catalogDataCatch; private CatalogDataManager catalogDataCatch;
@ -155,56 +141,17 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device,
channelList, regionList, groupList); channelList, regionList, groupList);
log.info("[收到通道]设备: {} -> {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.size(device.getDeviceId(), sn), sumNum); log.info("[收到通道]设备: {} -> {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.size(device.getDeviceId(), sn), sumNum);
if (catalogDataCatch.size(device.getDeviceId(), sn) > 0
&& catalogDataCatch.size(device.getDeviceId(), sn) == catalogDataCatch.sumNum(device.getDeviceId(), sn)) {
catalogDataCatch.setComplete(device.getDeviceId(), sn);
}
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[收到通道] 异常内容: ", e); log.error("[收到通道] 异常内容: ", e);
} finally {
String deviceId = device.getDeviceId();
if (catalogDataCatch.size(deviceId, sn) > 0
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
// 数据已经完整接收 此时可能存在某个设备离线变上线的情况但是考虑到性能此处不做处理
// 目前支持设备通道上线通知时和设备上线时向上级通知
int finalSn = sn;
Thread.startVirtualThread(() -> {
ReentrantLock lock = catalogDataCatch.getDeviceWriteLock(device.getDeviceId());
lock.lock();
try {
boolean resetChannelsResult = saveData(device, finalSn);
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "";
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg);
} else {
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null);
} }
} finally {
lock.unlock();
}
});
}
}
}
@Transactional
public boolean saveData(Device device, int sn) {
boolean result = true;
List<DeviceChannel> deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId(), sn);
if (deviceChannelList != null && !deviceChannelList.isEmpty()) {
result &= deviceChannelService.resetChannels(device.getId(), deviceChannelList);
}
List<Region> regionList = catalogDataCatch.getRegionList(device.getDeviceId(), sn);
if ( regionList!= null && !regionList.isEmpty()) {
result &= regionService.batchAdd(regionList);
}
List<Group> groupList = catalogDataCatch.getGroupList(device.getDeviceId(), sn);
if (groupList != null && !groupList.isEmpty()) {
result &= groupService.batchAdd(groupList);
}
return result;
} }
@Override @Override