Compare commits

...

5 Commits

Author SHA1 Message Date
阿斌
05f799f51e
Pre Merge pull request !36 from 阿斌/N/A 2026-06-08 04:40:11 +00:00
lin
9aca5aab35 优化目录刷新写入逻辑 2026-06-08 12:39:52 +08:00
lin
cc80287124 修复未开启行政区划和分组推送的上级平台收到行政区划和目录的BUG 2026-06-08 09:54:30 +08:00
lin
27285f321c 修复国标级联上级tcp主动点播概率下失败的BUG 2026-06-05 09:49:11 +08:00
阿斌
da98101aac
update src/main/resources/civilCode.csv.
行政规划错误。江苏南通海门市,修改为海门区,浙江杭州删除下城区、江干区,新增钱塘区,临平区

Signed-off-by: 阿斌 <38912748@qq.com>
2024-12-15 08:58:42 +00:00
16 changed files with 239 additions and 388 deletions

View File

@ -16,12 +16,12 @@ public class AlarmChannelMessage {
/**
* 报警编号
*/
private int alarmSn;
private Integer alarmSn;
/**
* 告警类型
*/
private int alarmType;
private Integer alarmType;
/**
* 报警描述

View File

@ -19,8 +19,8 @@ public class CatalogData {
private Instant time;
private Device device;
private String errorMsg;
private boolean complete;
private Set<String> redisKeysForChannel = new HashSet<>();
private Set<String> errorChannel = new HashSet<>();
private Set<String> redisKeysForRegion = new HashSet<>();
private Set<String> redisKeysForGroup = new HashSet<>();

View File

@ -1,44 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 码流索引标识
*/
public enum GbSteamIdentification {
/**
* 主码流 stream:0
* 子码流 stream:1s
*/
streamMain("stream", new String[]{"0","1"}),
/**
* 国标28181-2022定义的方式
* 主码流 streamnumber:0
* 子码流 streamnumber:1
*/
streamnumber("streamnumber", new String[]{"0","1"}),
/**
* 主码流 streamprofile:0
* 子码流 streamprofile:1
*/
streamprofile("streamprofile", new String[]{"0","1"}),
/**
* 适用的品牌 TP-LINK
*/
streamMode("streamMode", new String[]{"main","sub"}),
;
GbSteamIdentification(String value, String[] indexArray) {
this.value = value;
this.indexArray = indexArray;
}
private String value;
private String[] indexArray;
public String getValue() {
return value;
}
public String[] getIndexArray() {
return indexArray;
}
}

View File

@ -7,16 +7,19 @@ public class NotifyCatalogChannel {
private DeviceChannel channel;
private String deviceId;
public enum Type {
ADD, DELETE, UPDATE, STATUS_CHANGED
}
public static NotifyCatalogChannel getInstance(Type type, DeviceChannel channel) {
public static NotifyCatalogChannel getInstance(Type type, DeviceChannel channel, String deviceId) {
NotifyCatalogChannel notifyCatalogChannel = new NotifyCatalogChannel();
notifyCatalogChannel.setType(type);
notifyCatalogChannel.setChannel(channel);
notifyCatalogChannel.setDeviceId(deviceId);
return notifyCatalogChannel;
}
@ -35,4 +38,12 @@ public class NotifyCatalogChannel {
public void setChannel(DeviceChannel channel) {
this.channel = channel;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
}

View File

@ -1,116 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* 国标级联-目录
* @author lin
*/
@Schema(description = "目录信息")
public class PlatformCatalog {
@Schema(description = "ID")
private String id;
@Schema(description = "名称")
private String name;
@Schema(description = "平台ID")
private String platformId;
@Schema(description = "父级目录ID")
private String parentId;
@Schema(description = "行政区划")
private String civilCode;
@Schema(description = "目录分组")
private String businessGroupId;
/**
* 子节点数
*/
@Schema(description = "子节点数")
private int childrenCount;
/**
* 0 目录, 1 国标通道, 2 直播流
*/
@Schema(description = "类型0 目录, 1 国标通道, 2 直播流")
private int type;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPlatformId() {
return platformId;
}
public void setPlatformId(String platformId) {
this.platformId = platformId;
}
public String getParentId() {
return parentId;
}
public void setParentId(String parentId) {
this.parentId = parentId;
}
public int getChildrenCount() {
return childrenCount;
}
public void setChildrenCount(int childrenCount) {
this.childrenCount = childrenCount;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public void setTypeForCatalog() {
this.type = 0;
}
public void setTypeForGb() {
this.type = 1;
}
public void setTypeForStream() {
this.type = 2;
}
public String getCivilCode() {
return civilCode;
}
public void setCivilCode(String civilCode) {
this.civilCode = civilCode;
}
public String getBusinessGroupId() {
return businessGroupId;
}
public void setBusinessGroupId(String businessGroupId) {
this.businessGroupId = businessGroupId;
}
}

View File

@ -1,24 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
import lombok.Data;
@Data
public class PlatformCatch {
private String id;
/**
* 心跳未回复次数
*/
private int keepAliveReply;
// 注册未回复次数
private int registerAliveReply;
private String callId;
private Platform platform;
private SipTransactionInfo sipTransactionInfo;
}

View File

@ -551,7 +551,7 @@ public interface PlatformChannelMapper {
@Select("<script>" +
" select " +
" wpgc.platform_id as platform_id" +
" wpgc.platform_id as platform_id,\n" +
" wdc.id as gb_id,\n" +
" wdc.data_type,\n" +
" wdc.data_device_id,\n" +

View File

@ -404,12 +404,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
allChannelMap.remove(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId());
upsertChannels.add(deviceChannel);
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
if (subContMap.get(deviceChannel.getParentId()) == null) {
subContMap.put(deviceChannel.getParentId(), 1);
} else {
Integer count = subContMap.get(deviceChannel.getParentId());
subContMap.put(deviceChannel.getParentId(), count++);
}
subContMap.merge(deviceChannel.getParentId(), 1, Integer::sum);
}
}
deleteChannels.addAll(allChannelMap.values());

View File

@ -750,13 +750,16 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public void sync(Device device) {
int sn;
synchronized (device.getDeviceId().intern()) {
if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
SyncStatus syncStatus = catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId());
log.info("[同步通道] 同步已存在, 设备: {}, 同步信息: {}", device.getDeviceId(), JSON.toJSON(syncStatus));
return;
}
int sn = (int)((Math.random()*9+1)*100000);
sn = (int)((Math.random()*9+1)*100000);
catalogResponseMessageHandler.setChannelSyncReady(device, sn);
}
try {
sipCommander.catalogQuery(device, sn, event -> {
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);

View File

@ -393,17 +393,21 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
int result = platformChannelMapper.addChannels(platformId, channelList);
if (result > 0) {
// 查询通道相关的行政区划信息是否共享如果没共享就添加
// 判断平台是否客气了推送行政区划
if (platform.getCatalogWithRegion() != 0) {
Set<Region> regionListNotShare = getRegionNotShareByChannelList(channelList, platformId);
if (!regionListNotShare.isEmpty()) {
int addGroupResult = platformChannelMapper.addPlatformRegion(new ArrayList<>(regionListNotShare), platformId);
if (addGroupResult > 0) {
for (Region region : regionListNotShare) {
// 分组信息排序时需要将顶层排在最后
channelList.add(0, CommonGBChannel.build(region));
channelList.addFirst(CommonGBChannel.build(region));
}
}
}
}
if (platform.getCatalogWithGroup() != 0) {
// 查询通道相关的分组信息是否共享如果没共享就添加
Set<Group> groupListNotShare = getGroupNotShareByChannelList(channelList, platformId);
if (!groupListNotShare.isEmpty()) {
@ -411,7 +415,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
if (addGroupResult > 0) {
for (Group group : groupListNotShare) {
// 分组信息排序时需要将顶层排在最后
channelList.add(0, CommonGBChannel.build(group));
channelList.addFirst(CommonGBChannel.build(group));
}
}
}
}

View File

@ -16,6 +16,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -39,6 +40,8 @@ public class CatalogDataManager{
private final Map<String, CatalogData> dataMap = new ConcurrentHashMap<>();
private final Set<String> syncingDevices = ConcurrentHashMap.newKeySet();
private final Map<String, ReentrantLock> deviceWriteLocks = new ConcurrentHashMap<>();
public ReentrantLock getDeviceWriteLock(String deviceId) {
@ -56,8 +59,26 @@ public class CatalogDataManager{
}
public void addReady(Device device, int sn ) {
CatalogData catalogData = dataMap.get(buildMapKey(device.getDeviceId(),sn));
if (catalogData != null) {
// 清除该设备的所有旧条目
Iterator<Map.Entry<String, CatalogData>> it = dataMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, CatalogData> entry = it.next();
CatalogData old = entry.getValue();
if (old != null && device.getDeviceId().equals(old.getDevice().getDeviceId())) {
deleteRedisKeys(old);
it.remove();
}
}
CatalogData catalogData = new CatalogData();
catalogData.setDevice(device);
catalogData.setSn(sn);
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
catalogData.setTime(Instant.now());
dataMap.put(buildMapKey(device.getDeviceId(),sn), catalogData);
syncingDevices.add(device.getDeviceId());
}
private void deleteRedisKeys(CatalogData catalogData) {
Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
for (String deleteKey : redisKeysForChannel) {
@ -76,14 +97,6 @@ public class CatalogDataManager{
redisTemplate.opsForHash().delete(key, deleteKey);
}
}
dataMap.remove(buildMapKey(device.getDeviceId(),sn));
}
catalogData = new CatalogData();
catalogData.setDevice(device);
catalogData.setSn(sn);
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
catalogData.setTime(Instant.now());
dataMap.put(buildMapKey(device.getDeviceId(),sn), catalogData);
}
public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList,
@ -188,10 +201,6 @@ public class CatalogDataManager{
}else {
syncStatus.setSyncIng(true);
}
if (catalogData.getErrorMsg() != null) {
// 失败的同步信息,返回一次后直接移除
dataMap.remove(key);
}
return syncStatus;
}
}
@ -247,22 +256,22 @@ public class CatalogDataManager{
String errorMsg = "同步失败,等待回复超时";
catalogData.setErrorMsg(errorMsg);
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
syncingDevices.remove(catalogData.getDevice().getDeviceId());
}
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
if ( catalogData.getTime().isBefore(instantBefore5S)) {
boolean complete = catalogData.isComplete();
boolean timeout = catalogData.getTime().isBefore(instantBefore5S);
if (complete || timeout) {
String deviceId = catalogData.getDevice().getDeviceId();
ReentrantLock lock = getDeviceWriteLock(deviceId);
if (!lock.tryLock()) {
// saveData() 正在执行跳过本次等下一个5s周期
continue;
}
try {
int sn = catalogData.getSn();
List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId, sn);
if (catalogData.getTotal() == deviceChannelList.size()) {
if (!deviceChannelList.isEmpty()) {
deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList);
} else {
deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList);
}
List<Region> regionList = getRegionList(deviceId, sn);
if ( regionList!= null && !regionList.isEmpty()) {
@ -272,14 +281,15 @@ public class CatalogDataManager{
if (groupList != null && !groupList.isEmpty()) {
groupService.batchAdd(groupList);
}
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "";
catalogData.setErrorMsg(errorMsg);
catalogData.setErrorMsg(null);
} catch (Exception e) {
log.error("[国标通道同步] 入库失败: ", e);
catalogData.setErrorMsg("入库失败: " + e.getMessage());
} finally {
lock.unlock();
}
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
syncingDevices.remove(deviceId);
}
}else {
if (catalogData.getTime().isBefore(instantBefore30S)) {
@ -289,30 +299,22 @@ public class CatalogDataManager{
if (deviceWriteLocks.containsKey(deviceId)) {
deviceWriteLocks.remove(deviceId);
}
Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
for (String deleteKey : redisKeysForChannel) {
redisTemplate.opsForHash().delete(key, deleteKey);
}
}
Set<String> redisKeysForRegion = catalogData.getRedisKeysForRegion();
if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) {
for (String deleteKey : redisKeysForRegion) {
redisTemplate.opsForHash().delete(key, deleteKey);
}
}
Set<String> redisKeysForGroup = catalogData.getRedisKeysForGroup();
if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) {
for (String deleteKey : redisKeysForGroup) {
redisTemplate.opsForHash().delete(key, deleteKey);
}
}
syncingDevices.remove(deviceId);
deleteRedisKeys(catalogData);
}
}
}
}
public void setComplete(String deviceId, int sn) {
CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
if (catalogData == null) {
return;
}
catalogData.setComplete(true);
}
public void setChannelSyncEnd(String deviceId, int sn, String errorMsg) {
CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn));
if (catalogData == null) {
@ -321,6 +323,16 @@ public class CatalogDataManager{
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
catalogData.setErrorMsg(errorMsg);
catalogData.setTime(Instant.now());
syncingDevices.remove(deviceId);
}
public boolean isSyncing(String deviceId) {
return syncingDevices.contains(deviceId);
}
public boolean isEnd(String deviceId, int sn) {
CatalogData catalogData = dataMap.get(buildMapKey(deviceId, sn));
return catalogData != null && catalogData.getStatus() == CatalogData.CatalogDataStatus.end;
}
public int size(String deviceId, int sn) {
@ -328,7 +340,7 @@ public class CatalogDataManager{
if (catalogData == null) {
return 0;
}
return catalogData.getRedisKeysForChannel().size() + catalogData.getErrorChannel().size();
return catalogData.getRedisKeysForChannel().size();
}
public int sumNum(String deviceId, int sn) {

View File

@ -174,8 +174,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 点播成功 TODO 可以在此处检测cancel命令是否存在存在则不发送
if (userSetting.getUseCustomSsrcForParentInvite()) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
finalInviteInfo.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) ? "0" : "1"));
String sendSsrc = sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) ? "0" : "1");
finalInviteInfo.setSsrc(sendSsrc);
}
// 构建sendRTP内容
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(),
@ -196,6 +197,22 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sdpIp = platform.getSendStreamIp();
}
String content = createSendSdp(sendRtpItem, finalInviteInfo, sdpIp);
// tcp主动模式回复sdp后开启监听
if (sendRtpItem.isTcpActive()) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
try {
mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 10000);
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId());
if (deviceChannel != null) {
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, platform);
}
} catch (ControllerException e) {
log.warn("[上级INVITE] tcp主动模式 发流失败", e);
sendBye(platform, finalInviteInfo.getCallId());
}
}
// 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask.startDelay(finalInviteInfo.getCallId(), () -> {
log.info("[Ack ] 等待超时, {}/{}", finalInviteInfo.getCallId(), channel.getGbDeviceId());
@ -208,20 +225,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
log.error("[命令发送失败] 上级INVITE 发送 200SDP: {}", e.getMessage());
}
// tcp主动模式回复sdp后开启监听
if (sendRtpItem.isTcpActive()) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
try {
mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5);
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId());
if (deviceChannel != null) {
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, platform);
}
} catch (ControllerException e) {
log.warn("[上级INVITE] tcp主动模式 发流失败", e);
sendBye(platform, finalInviteInfo.getCallId());
}
}
}
}));
}

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -26,8 +27,10 @@ import javax.sip.header.FromHeader;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@ -56,6 +59,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
@Autowired
private IGbChannelService channelService;
@Autowired
private CatalogDataManager catalogDataManager;
public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
@ -149,7 +155,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 上线
log.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
channel.setStatus("ON");
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -163,7 +169,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else {
channel.setStatus("OFF");
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false);
@ -177,7 +183,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else {
channel.setStatus("OFF");
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -192,7 +198,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
log.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
} else {
channel.setStatus("OFF");
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -210,12 +216,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channel.setId(deviceChannel.getId());
channel.setHasAudio(deviceChannel.isHasAudio());
channel.setUpdateTime(DateUtil.getNow());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel, device.getDeviceId()));
} else {
catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow());
catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -227,7 +233,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
case CatalogEvent.DEL:
// 删除
log.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -244,12 +250,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channel.setHasAudio(deviceChannelForUpdate.isHasAudio());
channel.setUpdateTime(DateUtil.getNow());
channel.setUpdateTime(DateUtil.getNow());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel, device.getDeviceId()));
} else {
catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow());
catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow());
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel));
channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel, device.getDeviceId()));
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -281,7 +287,17 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channelListForSave.add(channelList.poll());
}
for (NotifyCatalogChannel notifyCatalogChannel : channelListForSave) {
Map<String, List<NotifyCatalogChannel>> grouped = new HashMap<>();
for (NotifyCatalogChannel item : channelListForSave) {
grouped.computeIfAbsent(item.getDeviceId(), k -> new ArrayList<>()).add(item);
}
for (Map.Entry<String, List<NotifyCatalogChannel>> entry : grouped.entrySet()) {
if (catalogDataManager.isSyncing(entry.getKey())) {
log.info("[NOTIFY] 设备 {} 正在同步中,跳过本次订阅通知", entry.getKey());
continue;
}
for (NotifyCatalogChannel notifyCatalogChannel : entry.getValue()) {
try {
switch (notifyCatalogChannel.getType()) {
case STATUS_CHANGED:
@ -316,4 +332,5 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
}
}
}
}

View File

@ -11,13 +11,14 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.utils.Coordtransform;
import gov.nist.javax.sip.message.SIPRequest;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@ -28,9 +29,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.Thread;
/**
* 目录查询的回复
@ -44,20 +42,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
@Autowired
private ResponseMessageHandler responseMessageHandler;
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private CatalogDataManager catalogDataCatch;
@Autowired
private IRegionService regionService;
@Autowired
private IGroupService groupService;
@Autowired
private CatalogDataManager catalogDataCatch;
@Autowired
private SipConfig sipConfig;
@ -155,56 +151,44 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device,
channelList, regionList, groupList);
log.info("[收到通道]设备: {} -> {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.size(device.getDeviceId(), sn), sumNum);
if (catalogDataCatch.size(device.getDeviceId(), sn) > 0
&& catalogDataCatch.size(device.getDeviceId(), sn) == catalogDataCatch.sumNum(device.getDeviceId(), sn)) {
ReentrantLock lock = catalogDataCatch.getDeviceWriteLock(device.getDeviceId());
if (!lock.tryLock()) {
log.info("[同步通道] 设备 {} 正在入库中,跳过重复写入", device.getDeviceId());
return;
}
try {
if (catalogDataCatch.isEnd(device.getDeviceId(), sn)) {
return;
}
List<DeviceChannel> channels = catalogDataCatch.getDeviceChannelList(device.getDeviceId(), sn);
if (!channels.isEmpty()) {
deviceChannelService.resetChannels(device.getId(), channels);
}
List<Region> regions = catalogDataCatch.getRegionList(device.getDeviceId(), sn);
if (regions != null && !regions.isEmpty()) {
regionService.batchAdd(regions);
}
List<Group> groups = catalogDataCatch.getGroupList(device.getDeviceId(), sn);
if (groups != null && !groups.isEmpty()) {
groupService.batchAdd(groups);
}
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), sn, null);
} catch (Exception e) {
log.warn("[同步通道] 直接入库失败,交由定时器兜底", e);
catalogDataCatch.setComplete(device.getDeviceId(), sn);
} finally {
lock.unlock();
}
}
}
}
} catch (Exception e) {
log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[收到通道] 异常内容: ", e);
} finally {
String deviceId = device.getDeviceId();
if (catalogDataCatch.size(deviceId, sn) > 0
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
// 数据已经完整接收 此时可能存在某个设备离线变上线的情况但是考虑到性能此处不做处理
// 目前支持设备通道上线通知时和设备上线时向上级通知
int finalSn = sn;
Thread.startVirtualThread(() -> {
ReentrantLock lock = catalogDataCatch.getDeviceWriteLock(device.getDeviceId());
lock.lock();
try {
boolean resetChannelsResult = saveData(device, finalSn);
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "";
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg);
} else {
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null);
}
} finally {
lock.unlock();
}
});
}
}
}
@Transactional
public boolean saveData(Device device, int sn) {
boolean result = true;
List<DeviceChannel> deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId(), sn);
if (deviceChannelList != null && !deviceChannelList.isEmpty()) {
result &= deviceChannelService.resetChannels(device.getId(), deviceChannelList);
}
List<Region> regionList = catalogDataCatch.getRegionList(device.getDeviceId(), sn);
if ( regionList!= null && !regionList.isEmpty()) {
result &= regionService.batchAdd(regionList);
}
List<Group> groupList = catalogDataCatch.getGroupList(device.getDeviceId(), sn);
if (groupList != null && !groupList.isEmpty()) {
result &= groupService.batchAdd(groupList);
}
return result;
}
@Override

View File

@ -861,7 +861,7 @@
320623,如东县,3206
320681,启东市,3206
320682,如皋市,3206
320684,海门,3206
320684,海门,3206
320685,海安市,3206
3207,连云港市,32
320703,连云区,3207
@ -918,8 +918,6 @@
33,浙江省,
3301,杭州市,33
330102,上城区,3301
330103,下城区,3301
330104,江干区,3301
330105,拱墅区,3301
330106,西湖区,3301
330108,滨江区,3301
@ -927,6 +925,8 @@
330110,余杭区,3301
330111,富阳区,3301
330112,临安区,3301
330113,临平区,3301
330114,钱塘区,3301
330122,桐庐县,3301
330127,淳安县,3301
330182,建德市,3301

1 编号 名称 上级
861 320623 如东县 3206
862 320681 启东市 3206
863 320682 如皋市 3206
864 320684 海门市 海门区 3206
865 320685 海安市 3206
866 3207 连云港市 32
867 320703 连云区 3207
918 33 浙江省
919 3301 杭州市 33
920 330102 上城区 3301
330103 下城区 3301
330104 江干区 3301
921 330105 拱墅区 3301
922 330106 西湖区 3301
923 330108 滨江区 3301
925 330110 余杭区 3301
926 330111 富阳区 3301
927 330112 临安区 3301
928 330113 临平区 3301
929 330114 钱塘区 3301
930 330122 桐庐县 3301
931 330127 淳安县 3301
932 330182 建德市 3301

View File

@ -202,23 +202,27 @@ DELIMITER ;
DELIMITER //
CREATE PROCEDURE `wvp_20260521`()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
IF NOT EXISTS (SELECT 1
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = (SELECT DATABASE())
AND TABLE_NAME = 'wvp_device_channel'
AND INDEX_NAME = 'uk_device_channel_source')
THEN
-- 先清理可能的重复数据
DELETE t1 FROM wvp_device_channel t1
INNER JOIN wvp_device_channel t2
WHERE t1.id < t2.id
AND t1.data_device_id = t2.data_device_id
AND t1.device_id = t2.device_id;
ALTER TABLE wvp_device_channel ADD UNIQUE INDEX uk_device_channel_source (data_device_id, device_id);
-- 用 GROUP BY + LEFT JOIN 替代自连接 DELETE
DELETE t1
FROM wvp_device_channel t1
LEFT JOIN (SELECT MAX(id) as id
FROM wvp_device_channel
GROUP BY data_device_id, device_id) t2 ON t1.id = t2.id
WHERE t2.id IS NULL;
ALTER TABLE wvp_device_channel
ADD UNIQUE INDEX uk_device_channel_source (data_device_id, device_id);
END IF;
END; //
DELIMITER ;
call wvp_20260521();
DROP PROCEDURE wvp_20260521;
DELIMITER ;