diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AlarmChannelMessage.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AlarmChannelMessage.java index 4b3bb132a..dd4465004 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AlarmChannelMessage.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AlarmChannelMessage.java @@ -16,12 +16,12 @@ public class AlarmChannelMessage { /** * 报警编号 */ - private int alarmSn; + private Integer alarmSn; /** * 告警类型 */ - private int alarmType; + private Integer alarmType; /** * 报警描述 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java index fb687b355..f06e13ea6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java @@ -19,8 +19,8 @@ public class CatalogData { private Instant time; private Device device; private String errorMsg; + private boolean complete; private Set redisKeysForChannel = new HashSet<>(); - private Set errorChannel = new HashSet<>(); private Set redisKeysForRegion = new HashSet<>(); private Set redisKeysForGroup = new HashSet<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSteamIdentification.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSteamIdentification.java deleted file mode 100644 index 63c17a80c..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSteamIdentification.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.bean; - -/** - * 码流索引标识 - */ -public enum GbSteamIdentification { - /** - * 主码流 stream:0 - * 子码流 stream:1s - */ - streamMain("stream", new String[]{"0","1"}), - /** - * 国标28181-2022定义的方式 - * 主码流 streamnumber:0 - * 子码流 streamnumber:1 - */ - streamnumber("streamnumber", new String[]{"0","1"}), - /** - * 主码流 streamprofile:0 - * 子码流 streamprofile:1 - */ - streamprofile("streamprofile", new String[]{"0","1"}), - /** - * 适用的品牌: TP-LINK - */ - streamMode("streamMode", new String[]{"main","sub"}), - ; - - GbSteamIdentification(String value, String[] indexArray) { - this.value = value; - this.indexArray = indexArray; - } - - private String value; - private String[] indexArray; - - public String getValue() { - return value; - } - - public String[] getIndexArray() { - return indexArray; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/NotifyCatalogChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/NotifyCatalogChannel.java index 8961677d9..660110426 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/NotifyCatalogChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/NotifyCatalogChannel.java @@ -7,16 +7,19 @@ public class NotifyCatalogChannel { private DeviceChannel channel; + private String deviceId; + public enum Type { ADD, DELETE, UPDATE, STATUS_CHANGED } - public static NotifyCatalogChannel getInstance(Type type, DeviceChannel channel) { + public static NotifyCatalogChannel getInstance(Type type, DeviceChannel channel, String deviceId) { NotifyCatalogChannel notifyCatalogChannel = new NotifyCatalogChannel(); notifyCatalogChannel.setType(type); notifyCatalogChannel.setChannel(channel); + notifyCatalogChannel.setDeviceId(deviceId); return notifyCatalogChannel; } @@ -35,4 +38,12 @@ public class NotifyCatalogChannel { public void setChannel(DeviceChannel channel) { this.channel = channel; } + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformCatalog.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformCatalog.java deleted file mode 100755 index 38ba2f03c..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformCatalog.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.bean; - -import io.swagger.v3.oas.annotations.media.Schema; - -/** - * 国标级联-目录 - * @author lin - */ -@Schema(description = "目录信息") -public class PlatformCatalog { - @Schema(description = "ID") - private String id; - - @Schema(description = "名称") - private String name; - - @Schema(description = "平台ID") - private String platformId; - - @Schema(description = "父级目录ID") - private String parentId; - - @Schema(description = "行政区划") - private String civilCode; - - @Schema(description = "目录分组") - private String businessGroupId; - - /** - * 子节点数 - */ - @Schema(description = "子节点数") - private int childrenCount; - - /** - * 0 目录, 1 国标通道, 2 直播流 - */ - @Schema(description = "类型:0 目录, 1 国标通道, 2 直播流") - private int type; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getPlatformId() { - return platformId; - } - - public void setPlatformId(String platformId) { - this.platformId = platformId; - } - - public String getParentId() { - return parentId; - } - - public void setParentId(String parentId) { - this.parentId = parentId; - } - - public int getChildrenCount() { - return childrenCount; - } - - public void setChildrenCount(int childrenCount) { - this.childrenCount = childrenCount; - } - - public int getType() { - return type; - } - - public void setType(int type) { - this.type = type; - } - - public void setTypeForCatalog() { - this.type = 0; - } - - public void setTypeForGb() { - this.type = 1; - } - - public void setTypeForStream() { - this.type = 2; - } - - public String getCivilCode() { - return civilCode; - } - - public void setCivilCode(String civilCode) { - this.civilCode = civilCode; - } - - public String getBusinessGroupId() { - return businessGroupId; - } - - public void setBusinessGroupId(String businessGroupId) { - this.businessGroupId = businessGroupId; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformCatch.java deleted file mode 100755 index db7ab9805..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformCatch.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.bean; - -import lombok.Data; - -@Data -public class PlatformCatch { - - private String id; - - /** - * 心跳未回复次数 - */ - private int keepAliveReply; - - // 注册未回复次数 - private int registerAliveReply; - - private String callId; - - private Platform platform; - - private SipTransactionInfo sipTransactionInfo; - -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index cc5c46166..529b520fc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -404,12 +404,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { allChannelMap.remove(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId()); upsertChannels.add(deviceChannel); if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) { - if (subContMap.get(deviceChannel.getParentId()) == null) { - subContMap.put(deviceChannel.getParentId(), 1); - } else { - Integer count = subContMap.get(deviceChannel.getParentId()); - subContMap.put(deviceChannel.getParentId(), count++); - } + subContMap.merge(deviceChannel.getParentId(), 1, Integer::sum); } } deleteChannels.addAll(allChannelMap.values()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 21b43849e..950d7db14 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -750,13 +750,16 @@ public class DeviceServiceImpl implements IDeviceService { @Override public void sync(Device device) { - if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) { - SyncStatus syncStatus = catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()); - log.info("[同步通道] 同步已存在, 设备: {}, 同步信息: {}", device.getDeviceId(), JSON.toJSON(syncStatus)); - return; + int sn; + synchronized (device.getDeviceId().intern()) { + if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) { + SyncStatus syncStatus = catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()); + log.info("[同步通道] 同步已存在, 设备: {}, 同步信息: {}", device.getDeviceId(), JSON.toJSON(syncStatus)); + return; + } + sn = (int)((Math.random()*9+1)*100000); + catalogResponseMessageHandler.setChannelSyncReady(device, sn); } - int sn = (int)((Math.random()*9+1)*100000); - catalogResponseMessageHandler.setChannelSyncReady(device, sn); try { sipCommander.catalogQuery(device, sn, event -> { String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index 61e7e10db..15316c7e0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -393,25 +393,30 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { int result = platformChannelMapper.addChannels(platformId, channelList); if (result > 0) { // 查询通道相关的行政区划信息是否共享,如果没共享就添加 - Set regionListNotShare = getRegionNotShareByChannelList(channelList, platformId); - if (!regionListNotShare.isEmpty()) { - int addGroupResult = platformChannelMapper.addPlatformRegion(new ArrayList<>(regionListNotShare), platformId); - if (addGroupResult > 0) { - for (Region region : regionListNotShare) { - // 分组信息排序时需要将顶层排在最后 - channelList.add(0, CommonGBChannel.build(region)); + // 判断平台是否客气了推送行政区划 + if (platform.getCatalogWithRegion() != 0) { + Set regionListNotShare = getRegionNotShareByChannelList(channelList, platformId); + if (!regionListNotShare.isEmpty()) { + int addGroupResult = platformChannelMapper.addPlatformRegion(new ArrayList<>(regionListNotShare), platformId); + if (addGroupResult > 0) { + for (Region region : regionListNotShare) { + // 分组信息排序时需要将顶层排在最后 + channelList.addFirst(CommonGBChannel.build(region)); + } } } } - // 查询通道相关的分组信息是否共享,如果没共享就添加 - Set groupListNotShare = getGroupNotShareByChannelList(channelList, platformId); - if (!groupListNotShare.isEmpty()) { - int addGroupResult = platformChannelMapper.addPlatformGroup(new ArrayList<>(groupListNotShare), platformId); - if (addGroupResult > 0) { - for (Group group : groupListNotShare) { - // 分组信息排序时需要将顶层排在最后 - channelList.add(0, CommonGBChannel.build(group)); + if (platform.getCatalogWithGroup() != 0) { + // 查询通道相关的分组信息是否共享,如果没共享就添加 + Set groupListNotShare = getGroupNotShareByChannelList(channelList, platformId); + if (!groupListNotShare.isEmpty()) { + int addGroupResult = platformChannelMapper.addPlatformGroup(new ArrayList<>(groupListNotShare), platformId); + if (addGroupResult > 0) { + for (Group group : groupListNotShare) { + // 分组信息排序时需要将顶层排在最后 + channelList.addFirst(CommonGBChannel.build(group)); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java index 451467492..c5a53b424 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java @@ -16,6 +16,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -56,29 +57,17 @@ public class CatalogDataManager{ } public void addReady(Device device, int sn ) { - CatalogData catalogData = dataMap.get(buildMapKey(device.getDeviceId(),sn)); - if (catalogData != null) { - Set redisKeysForChannel = catalogData.getRedisKeysForChannel(); - if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) { - for (String deleteKey : redisKeysForChannel) { - redisTemplate.opsForHash().delete(key, deleteKey); - } + // 清除该设备的所有旧条目 + Iterator> it = dataMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + CatalogData old = entry.getValue(); + if (old != null && device.getDeviceId().equals(old.getDevice().getDeviceId())) { + deleteRedisKeys(old); + it.remove(); } - Set redisKeysForRegion = catalogData.getRedisKeysForRegion(); - if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) { - for (String deleteKey : redisKeysForRegion) { - redisTemplate.opsForHash().delete(key, deleteKey); - } - } - Set redisKeysForGroup = catalogData.getRedisKeysForGroup(); - if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) { - for (String deleteKey : redisKeysForGroup) { - redisTemplate.opsForHash().delete(key, deleteKey); - } - } - dataMap.remove(buildMapKey(device.getDeviceId(),sn)); } - catalogData = new CatalogData(); + CatalogData catalogData = new CatalogData(); catalogData.setDevice(device); catalogData.setSn(sn); catalogData.setStatus(CatalogData.CatalogDataStatus.ready); @@ -86,6 +75,27 @@ public class CatalogDataManager{ dataMap.put(buildMapKey(device.getDeviceId(),sn), catalogData); } + private void deleteRedisKeys(CatalogData catalogData) { + Set redisKeysForChannel = catalogData.getRedisKeysForChannel(); + if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) { + for (String deleteKey : redisKeysForChannel) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + Set redisKeysForRegion = catalogData.getRedisKeysForRegion(); + if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) { + for (String deleteKey : redisKeysForRegion) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + Set redisKeysForGroup = catalogData.getRedisKeysForGroup(); + if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) { + for (String deleteKey : redisKeysForGroup) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + } + public void put(String deviceId, int sn, int total, Device device, List deviceChannelList, List regionList, List groupList) { CatalogData catalogData = dataMap.get(buildMapKey(device.getDeviceId(),sn)); @@ -188,10 +198,6 @@ public class CatalogDataManager{ }else { syncStatus.setSyncIng(true); } - if (catalogData.getErrorMsg() != null) { - // 失败的同步信息,返回一次后直接移除 - dataMap.remove(key); - } return syncStatus; } } @@ -249,20 +255,19 @@ public class CatalogDataManager{ catalogData.setStatus(CatalogData.CatalogDataStatus.end); } }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { - if ( catalogData.getTime().isBefore(instantBefore5S)) { + boolean complete = catalogData.isComplete(); + boolean timeout = catalogData.getTime().isBefore(instantBefore5S); + if (complete || timeout) { String deviceId = catalogData.getDevice().getDeviceId(); ReentrantLock lock = getDeviceWriteLock(deviceId); if (!lock.tryLock()) { - // saveData() 正在执行,跳过本次,等下一个5s周期 continue; } try { int sn = catalogData.getSn(); List deviceChannelList = getDeviceChannelList(deviceId, sn); - if (catalogData.getTotal() == deviceChannelList.size()) { + if (!deviceChannelList.isEmpty()) { deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList); - } else { - deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList); } List regionList = getRegionList(deviceId, sn); if ( regionList!= null && !regionList.isEmpty()) { @@ -272,10 +277,10 @@ public class CatalogDataManager{ if (groupList != null && !groupList.isEmpty()) { groupService.batchAdd(groupList); } - String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "条"; - catalogData.setErrorMsg(errorMsg); + catalogData.setErrorMsg(null); } catch (Exception e) { log.error("[国标通道同步] 入库失败: ", e); + catalogData.setErrorMsg("入库失败: " + e.getMessage()); } finally { lock.unlock(); } @@ -289,30 +294,21 @@ public class CatalogDataManager{ if (deviceWriteLocks.containsKey(deviceId)) { deviceWriteLocks.remove(deviceId); } - Set redisKeysForChannel = catalogData.getRedisKeysForChannel(); - if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) { - for (String deleteKey : redisKeysForChannel) { - redisTemplate.opsForHash().delete(key, deleteKey); - } - } - Set redisKeysForRegion = catalogData.getRedisKeysForRegion(); - if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) { - for (String deleteKey : redisKeysForRegion) { - redisTemplate.opsForHash().delete(key, deleteKey); - } - } - Set redisKeysForGroup = catalogData.getRedisKeysForGroup(); - if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) { - for (String deleteKey : redisKeysForGroup) { - redisTemplate.opsForHash().delete(key, deleteKey); - } - } + deleteRedisKeys(catalogData); } } } } + public void setComplete(String deviceId, int sn) { + CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); + if (catalogData == null) { + return; + } + catalogData.setComplete(true); + } + public void setChannelSyncEnd(String deviceId, int sn, String errorMsg) { CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); if (catalogData == null) { @@ -328,7 +324,7 @@ public class CatalogDataManager{ if (catalogData == null) { return 0; } - return catalogData.getRedisKeysForChannel().size() + catalogData.getErrorChannel().size(); + return catalogData.getRedisKeysForChannel().size(); } public int sumNum(String deviceId, int sn) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index a0b8fe939..88ee9a3fa 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; +import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -26,9 +27,12 @@ import javax.sip.header.FromHeader; import java.lang.reflect.InvocationTargetException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; /** * SIP命令类型: NOTIFY请求中的目录请求处理 @@ -56,6 +60,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IGbChannelService channelService; + @Autowired + private CatalogDataManager catalogDataManager; + public void process(RequestEvent evt) { if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { @@ -149,7 +156,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 上线 log.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); channel.setStatus("ON"); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId())); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -163,7 +170,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent log.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); } else { channel.setStatus("OFF"); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId())); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); @@ -177,7 +184,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent log.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); } else { channel.setStatus("OFF"); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId())); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -192,7 +199,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent log.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); } else { channel.setStatus("OFF"); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel, device.getDeviceId())); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -210,12 +217,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setId(deviceChannel.getId()); channel.setHasAudio(deviceChannel.isHasAudio()); channel.setUpdateTime(DateUtil.getNow()); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel, device.getDeviceId())); } else { catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow()); catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow()); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel, device.getDeviceId())); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -227,7 +234,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent case CatalogEvent.DEL: // 删除 log.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel, device.getDeviceId())); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -244,12 +251,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setHasAudio(deviceChannelForUpdate.isHasAudio()); channel.setUpdateTime(DateUtil.getNow()); channel.setUpdateTime(DateUtil.getNow()); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel, device.getDeviceId())); } else { catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow()); catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow()); - channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel)); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel, device.getDeviceId())); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -281,38 +288,51 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channelListForSave.add(channelList.poll()); } - for (NotifyCatalogChannel notifyCatalogChannel : channelListForSave) { + Map> grouped = new HashMap<>(); + for (NotifyCatalogChannel item : channelListForSave) { + grouped.computeIfAbsent(item.getDeviceId(), k -> new ArrayList<>()).add(item); + } + + for (Map.Entry> entry : grouped.entrySet()) { + ReentrantLock lock = catalogDataManager.getDeviceWriteLock(entry.getKey()); + lock.lock(); try { - switch (notifyCatalogChannel.getType()) { - case STATUS_CHANGED: - deviceChannelService.updateChannelStatusForNotify(notifyCatalogChannel.getChannel()); - CommonGBChannel channelForStatus = channelService.queryCommonChannelByDeviceChannel(notifyCatalogChannel.getChannel()); - if ("ON".equals(notifyCatalogChannel.getChannel().getStatus()) ) { - eventPublisher.channelEventPublish(channelForStatus, ChannelEvent.ChannelEventMessageType.ON); - }else { - eventPublisher.channelEventPublish(channelForStatus, ChannelEvent.ChannelEventMessageType.OFF); + for (NotifyCatalogChannel notifyCatalogChannel : entry.getValue()) { + try { + switch (notifyCatalogChannel.getType()) { + case STATUS_CHANGED: + deviceChannelService.updateChannelStatusForNotify(notifyCatalogChannel.getChannel()); + CommonGBChannel channelForStatus = channelService.queryCommonChannelByDeviceChannel(notifyCatalogChannel.getChannel()); + if ("ON".equals(notifyCatalogChannel.getChannel().getStatus()) ) { + eventPublisher.channelEventPublish(channelForStatus, ChannelEvent.ChannelEventMessageType.ON); + }else { + eventPublisher.channelEventPublish(channelForStatus, ChannelEvent.ChannelEventMessageType.OFF); + } + break; + case ADD: + deviceChannelService.addChannel(notifyCatalogChannel.getChannel()); + CommonGBChannel channelForAdd = channelService.getOne(notifyCatalogChannel.getChannel().getId()); + eventPublisher.channelEventPublish(channelForAdd, ChannelEvent.ChannelEventMessageType.ADD); + break; + case UPDATE: + CommonGBChannel oldCommonChannel = channelService.getOne(notifyCatalogChannel.getChannel().getId()); + deviceChannelService.updateChannelForNotify(notifyCatalogChannel.getChannel()); + CommonGBChannel channel = channelService.getOne(oldCommonChannel.getGbId()); + eventPublisher.channelEventPublishForUpdate(channel, oldCommonChannel); + break; + case DELETE: + CommonGBChannel oldCommonChannelForDelete = channelService.queryCommonChannelByDeviceChannel(notifyCatalogChannel.getChannel()); + deviceChannelService.deleteForNotify(notifyCatalogChannel.getChannel()); + eventPublisher.channelEventPublish(oldCommonChannelForDelete, ChannelEvent.ChannelEventMessageType.DEL); + break; } - break; - case ADD: - deviceChannelService.addChannel(notifyCatalogChannel.getChannel()); - CommonGBChannel channelForAdd = channelService.getOne(notifyCatalogChannel.getChannel().getId()); - eventPublisher.channelEventPublish(channelForAdd, ChannelEvent.ChannelEventMessageType.ADD); - break; - case UPDATE: - CommonGBChannel oldCommonChannel = channelService.getOne(notifyCatalogChannel.getChannel().getId()); - deviceChannelService.updateChannelForNotify(notifyCatalogChannel.getChannel()); - CommonGBChannel channel = channelService.getOne(oldCommonChannel.getGbId()); - eventPublisher.channelEventPublishForUpdate(channel, oldCommonChannel); - break; - case DELETE: - CommonGBChannel oldCommonChannelForDelete = channelService.queryCommonChannelByDeviceChannel(notifyCatalogChannel.getChannel()); - deviceChannelService.deleteForNotify(notifyCatalogChannel.getChannel()); - eventPublisher.channelEventPublish(oldCommonChannelForDelete, ChannelEvent.ChannelEventMessageType.DEL); - break; + }catch (Exception e) { + log.error("[存储收到的通道-异常]类型:{},编号:{}", notifyCatalogChannel.getType(), + notifyCatalogChannel.getChannel().getDeviceId(), e); + } } - }catch (Exception e) { - log.error("[存储收到的通道-异常]类型:{},编号:{}", notifyCatalogChannel.getType(), - notifyCatalogChannel.getChannel().getDeviceId(), e); + } finally { + lock.unlock(); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 417245d5f..fede05cfa 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -3,8 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import com.genersoft.iot.vmp.gb28181.service.IGroupService; -import com.genersoft.iot.vmp.gb28181.service.IRegionService; import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; @@ -17,7 +15,6 @@ import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -28,9 +25,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReentrantLock; -import java.lang.Thread; /** * 目录查询的回复 @@ -44,17 +38,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Autowired private ResponseMessageHandler responseMessageHandler; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Autowired private IDeviceChannelService deviceChannelService; - @Autowired - private IRegionService regionService; - - @Autowired - private IGroupService groupService; - @Autowired private CatalogDataManager catalogDataCatch; @@ -155,58 +141,19 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList, regionList, groupList); log.info("[收到通道]设备: {} -> {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.size(device.getDeviceId(), sn), sumNum); + + if (catalogDataCatch.size(device.getDeviceId(), sn) > 0 + && catalogDataCatch.size(device.getDeviceId(), sn) == catalogDataCatch.sumNum(device.getDeviceId(), sn)) { + catalogDataCatch.setComplete(device.getDeviceId(), sn); + } } } } catch (Exception e) { log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); log.error("[收到通道] 异常内容: ", e); - } finally { - String deviceId = device.getDeviceId(); - if (catalogDataCatch.size(deviceId, sn) > 0 - && catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) { - // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, - // 目前支持设备通道上线通知时和设备上线时向上级通知 - int finalSn = sn; - Thread.startVirtualThread(() -> { - ReentrantLock lock = catalogDataCatch.getDeviceWriteLock(device.getDeviceId()); - lock.lock(); - try { - boolean resetChannelsResult = saveData(device, finalSn); - if (!resetChannelsResult) { - String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "条"; - catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg); - } else { - catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null); - } - } finally { - lock.unlock(); - } - }); - } } } - @Transactional - public boolean saveData(Device device, int sn) { - - boolean result = true; - List deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId(), sn); - if (deviceChannelList != null && !deviceChannelList.isEmpty()) { - result &= deviceChannelService.resetChannels(device.getId(), deviceChannelList); - } - - List regionList = catalogDataCatch.getRegionList(device.getDeviceId(), sn); - if ( regionList!= null && !regionList.isEmpty()) { - result &= regionService.batchAdd(regionList); - } - - List groupList = catalogDataCatch.getGroupList(device.getDeviceId(), sn); - if (groupList != null && !groupList.isEmpty()) { - result &= groupService.batchAdd(groupList); - } - return result; - } - @Override public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) {