Compare commits

...

6 Commits

Author SHA1 Message Date
阿斌
5f1ef95128
Pre Merge pull request !36 from 阿斌/N/A 2025-11-19 09:04:16 +00:00
lin
a97a1113cf 通道编辑修复分组路径显示异常,支持行政区划路径显示 2025-11-19 17:04:01 +08:00
lin
033db1925c 通道编辑修复分组路径显示异常,支持行政区划路径显示 2025-11-19 17:03:45 +08:00
lin
c1c9c7957b 调整通道列表排序 2025-11-18 12:30:51 +08:00
lin
51a7ae056e 修复翻页未重置页码的问题 2025-11-18 12:16:20 +08:00
阿斌
da98101aac
update src/main/resources/civilCode.csv.
行政规划错误。江苏南通海门市,修改为海门区,浙江杭州删除下城区、江干区,新增钱塘区,临平区

Signed-off-by: 阿斌 <38912748@qq.com>
2024-12-15 08:58:42 +00:00
32 changed files with 383 additions and 185 deletions

View File

@ -396,4 +396,56 @@ public class CommonGBChannel {
return commonGBChannel;
}
@Override
public String toString() {
return "CommonGBChannel{" +
"gbId=" + gbId +
", gbDeviceId='" + gbDeviceId + '\'' +
", gbName='" + gbName + '\'' +
", gbManufacturer='" + gbManufacturer + '\'' +
", gbModel='" + gbModel + '\'' +
", gbOwner='" + gbOwner + '\'' +
", gbCivilCode='" + gbCivilCode + '\'' +
", gbBlock='" + gbBlock + '\'' +
", gbAddress='" + gbAddress + '\'' +
", gbParental=" + gbParental +
", gbParentId='" + gbParentId + '\'' +
", gbSafetyWay=" + gbSafetyWay +
", gbRegisterWay=" + gbRegisterWay +
", gbCertNum='" + gbCertNum + '\'' +
", gbCertifiable=" + gbCertifiable +
", gbErrCode=" + gbErrCode +
", gbEndTime='" + gbEndTime + '\'' +
", gbSecrecy=" + gbSecrecy +
", gbIpAddress='" + gbIpAddress + '\'' +
", gbPort=" + gbPort +
", gbPassword='" + gbPassword + '\'' +
", gbStatus='" + gbStatus + '\'' +
", gbLongitude=" + gbLongitude +
", gbLatitude=" + gbLatitude +
", gpsAltitude=" + gpsAltitude +
", gpsSpeed=" + gpsSpeed +
", gpsDirection=" + gpsDirection +
", gpsTime='" + gpsTime + '\'' +
", gbBusinessGroupId='" + gbBusinessGroupId + '\'' +
", gbPtzType=" + gbPtzType +
", gbPositionType=" + gbPositionType +
", gbRoomType=" + gbRoomType +
", gbUseType=" + gbUseType +
", gbSupplyLightType=" + gbSupplyLightType +
", gbDirectionType=" + gbDirectionType +
", gbResolution='" + gbResolution + '\'' +
", gbDownloadSpeed='" + gbDownloadSpeed + '\'' +
", gbSvcSpaceSupportMod=" + gbSvcSpaceSupportMod +
", gbSvcTimeSupportMode=" + gbSvcTimeSupportMode +
", recordPLan=" + recordPLan +
", dataType=" + dataType +
", dataDeviceId=" + dataDeviceId +
", createTime='" + createTime + '\'' +
", updateTime='" + updateTime + '\'' +
", streamId='" + streamId + '\'' +
", enableBroadcast=" + enableBroadcast +
", mapLevel=" + mapLevel +
'}';
}
}

View File

@ -32,7 +32,8 @@ public class SubscribeHolder {
public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
log.info("[国标级联] 添加目录订阅,平台: {} 有效期: {}", platformId, subscribeInfo.getExpires());
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
subscribeInfo.setServerId(userSetting.getServerId());
String key = String.format("%s:%s:%s", prefix, "catalog", platformId);
if (subscribeInfo.getExpires() > 0) {
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
@ -42,18 +43,19 @@ public class SubscribeHolder {
}
public SubscribeInfo getCatalogSubscribe(String platformId) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
String key = String.format("%s:%s:%s", prefix, "catalog", platformId);
return (SubscribeInfo)redisTemplate.opsForValue().get(key);
}
public void removeCatalogSubscribe(String platformId) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
String key = String.format("%s:%s:%s", prefix, "catalog", platformId);
redisTemplate.delete(key);
}
public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) {
log.info("[国标级联] 添加移动位置订阅,平台: {} 有效期: {}s", platformId, subscribeInfo.getExpires());
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
subscribeInfo.setServerId(userSetting.getServerId());
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId);
if (subscribeInfo.getExpires() > 0) {
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
@ -81,12 +83,12 @@ public class SubscribeHolder {
}
public SubscribeInfo getMobilePositionSubscribe(String platformId) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId);
return (SubscribeInfo)redisTemplate.opsForValue().get(key);
}
public void removeMobilePositionSubscribe(String platformId) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId);
redisTemplate.delete(key);
}
@ -96,7 +98,7 @@ public class SubscribeHolder {
}
List<String> result = new ArrayList<>();
for (Platform platform : platformList) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platform.getServerGBId());
String key = String.format("%s:%s:%s", prefix, "catalog", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerGBId());
}
@ -110,7 +112,7 @@ public class SubscribeHolder {
}
List<String> result = new ArrayList<>();
for (Platform platform : platformList) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platform.getServerGBId());
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerGBId());
}

View File

@ -37,6 +37,11 @@ public class SubscribeInfo {
*/
private String simulatedCallId;
/**
* 来源serverId
*/
private String serverId;
public static SubscribeInfo getInstance(SIPResponse response, String id, int expires, EventHeader eventHeader){
SubscribeInfo subscribeInfo = new SubscribeInfo();

View File

@ -22,13 +22,13 @@ public interface DeviceChannelMapper {
"insert into wvp_device_channel " +
"(device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, gb_longitude, gb_latitude, ptz_type, position_type, room_type, use_type, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type) " +
"values " +
"(#{deviceId}, #{dataType}, #{dataDeviceId}, #{name}, #{manufacturer}, #{model}, #{owner}, #{civilCode}, #{block}, " +
"#{address}, #{parental}, #{parentId}, #{safetyWay}, #{registerWay}, #{certNum}, #{certifiable}, #{errCode}, #{endTime}, #{secrecy}, " +
"#{ipAddress}, #{port}, #{password}, #{status}, #{longitude}, #{latitude}, #{gbLongitude}, #{gbLatitude}, #{ptzType}, #{positionType}, #{roomType}, #{useType}, " +
"#{ipAddress}, #{port}, #{password}, #{status}, #{longitude}, #{latitude}, #{ptzType}, #{positionType}, #{roomType}, #{useType}, " +
"#{supplyLightType}, #{directionType}, #{resolution}, #{businessGroupId}, #{downloadSpeed}, #{svcSpaceSupportMod}," +
" #{svcTimeSupportMode}, #{createTime}, #{updateTime}, #{subCount}, #{streamId}, #{hasAudio}, #{gpsTime}, #{streamIdentification}, #{channelType}) " +
"</script>")
@ -63,8 +63,6 @@ public interface DeviceChannelMapper {
", status=#{status}" +
", longitude=#{longitude}" +
", latitude=#{latitude}" +
", gb_longitude=#{gbLongitude}" +
", gb_latitude=#{gbLatitude}" +
", ptz_type=#{ptzType}" +
", position_type=#{positionType}" +
", room_type=#{roomType}" +
@ -200,14 +198,14 @@ public interface DeviceChannelMapper {
"insert into wvp_device_channel " +
"(device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, gb_longitude, gb_latitude, ptz_type, position_type, room_type, use_type, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type) " +
"values " +
"<foreach collection='addChannels' index='index' item='item' separator=','> " +
"(#{item.deviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " +
"#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.gbLongitude}, #{item.gbLatitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," +
" #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType}) " +
"</foreach> " +
@ -534,8 +532,6 @@ public interface DeviceChannelMapper {
", status=#{status}" +
", longitude=#{longitude}" +
", latitude=#{latitude}" +
", gb_longitude=#{gbLongitude}" +
", gb_latitude=#{gbLatitude}" +
", ptz_type=#{ptzType}" +
", position_type=#{positionType}" +
", room_type=#{roomType}" +

View File

@ -328,11 +328,26 @@ public interface GroupMapper {
@Delete("DELETE FROM wvp_common_group where alias is not null")
void deleteHasAlias();
@Update(" UPDATE wvp_common_group g1" +
@Update(value = " UPDATE wvp_common_group g1" +
" JOIN wvp_common_group g2" +
" ON g1.parent_device_id = g2.device_id" +
" SET g1.parent_id = g2.id" +
" WHERE g1.alias IS NOT NULL;")
" WHERE g1.alias IS NOT NULL;", databaseId = "mysql")
@Update(value = " UPDATE wvp_common_group g1" +
" JOIN wvp_common_group g2" +
" ON g1.parent_device_id = g2.device_id" +
" SET g1.parent_id = g2.id" +
" WHERE g1.alias IS NOT NULL;", databaseId = "h2")
@Update(value = " UPDATE wvp_common_group AS g1" +
" SET parent_id = g2.id" +
" FROM wvp_common_group AS g2" +
" WHERE g1.parent_device_id = g2.device_id" +
" AND g1.alias IS NOT NULL;", databaseId = "kingbase")
@Update(value = " UPDATE wvp_common_group AS g1" +
" SET parent_id = g2.id" +
" FROM wvp_common_group AS g2" +
" WHERE g1.parent_device_id = g2.device_id" +
" AND g1.alias IS NOT NULL;", databaseId = "postgresql")
void fixParentId();
}

View File

@ -295,6 +295,7 @@ public class ChannelProvider {
if (params.get("parentDeviceId") != null) {
sqlBuild.append(" AND coalesce(gb_parent_id, parent_id) = #{parentDeviceId}");
}
sqlBuild.append(" order by create_time desc");
return sqlBuild.toString();
}

View File

@ -65,16 +65,19 @@ public class EventPublisher {
}
public void channelEventPublishForUpdate(CommonGBChannel commonGBChannel, CommonGBChannel deviceChannelForOld) {
log.info("[通道改变内部分发-更新] {}", commonGBChannel.getGbDeviceId());
ChannelEvent channelEvent = ChannelEvent.getInstanceForUpdate(this, Collections.singletonList(commonGBChannel), Collections.singletonList(deviceChannelForOld));
applicationEventPublisher.publishEvent(channelEvent);
}
public void channelEventPublishForUpdate(List<CommonGBChannel> channelList, List<CommonGBChannel> channelListForOld) {
log.info("[通道改变内部分发-更新] 数量: {}", channelList.size());
ChannelEvent channelEvent = ChannelEvent.getInstanceForUpdate(this, channelList, channelListForOld);
applicationEventPublisher.publishEvent(channelEvent);
}
public void channelEventPublish(List<CommonGBChannel> channelList, ChannelEvent.ChannelEventMessageType type) {
log.info("[通道改变内部分发-{}] 数量: {}", type, channelList.size());
ChannelEvent channelEvent = ChannelEvent.getInstance(this, type, channelList);
applicationEventPublisher.publishEvent(channelEvent);
}

View File

@ -24,11 +24,11 @@ public interface IGbChannelService {
int offline(CommonGBChannel commonGBChannel);
int offline(List<CommonGBChannel> commonGBChannelList);
int offline(List<CommonGBChannel> commonGBChannelList, boolean permission);
int online(CommonGBChannel commonGBChannel);
int online(List<CommonGBChannel> commonGBChannelList);
int online(List<CommonGBChannel> commonGBChannelList, boolean permission);
void batchAdd(List<CommonGBChannel> commonGBChannels);
@ -78,7 +78,7 @@ public interface IGbChannelService {
void deleteChannelToGroupByGbDevice(List<Integer> deviceIds);
void batchUpdate(List<CommonGBChannel> commonGBChannels);
void batchUpdateForStreamPushRedisMsg(List<CommonGBChannel> commonGBChannels, boolean permission);
CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId);

View File

@ -187,7 +187,7 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
@Override
public int update(CommonGBChannel commonGBChannel) {
log.info("[更新通道] 通道ID: {}, ", commonGBChannel.getGbId());
log.info("[更新通道] 通道ID: {}, ", commonGBChannel.toString());
if (commonGBChannel.getGbId() <= 0) {
log.warn("[更新通道] 未找到数据库ID更新失败 {}({})", commonGBChannel.getGbName(), commonGBChannel.getGbDeviceId());
return 0;
@ -248,32 +248,33 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
@Override
@Transactional
public int offline(List<CommonGBChannel> commonGBChannelList) {
public int offline(List<CommonGBChannel> commonGBChannelList, boolean permission) {
if (commonGBChannelList.isEmpty()) {
log.warn("[多个通道离线] 通道数量为0更新失败");
return 0;
}
log.info("[通道离线] 共 {} 个", commonGBChannelList.size());
int limitCount = 1000;
int result = 0;
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
if (permission) {
int limitCount = 1000;
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF");
}
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF");
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF");
}
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF");
log.info("[通道离线] 保存入库 共 {} 个改变", result);
}
if (result > 0) {
try {
// 发送catalog
eventPublisher.channelEventPublish(commonGBChannelList, ChannelEvent.ChannelEventMessageType.OFF);
} catch (Exception e) {
log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e);
}
try {
// 发送catalog
eventPublisher.channelEventPublish(commonGBChannelList, ChannelEvent.ChannelEventMessageType.OFF);
} catch (Exception e) {
log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e);
}
return result;
}
@ -298,24 +299,26 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
@Override
@Transactional
public int online(List<CommonGBChannel> commonGBChannelList) {
public int online(List<CommonGBChannel> commonGBChannelList, boolean permission) {
if (commonGBChannelList.isEmpty()) {
log.warn("[多个通道上线] 通道数量为0更新失败");
return 0;
}
// 批量更新
int limitCount = 1000;
int result = 0;
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
if (permission) {
// 批量更新
int limitCount = 1000;
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON");
}
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON");
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON");
}
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON");
}
try {
// 发送catalog
@ -358,27 +361,29 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
}
@Override
public void batchUpdate(List<CommonGBChannel> commonGBChannels) {
public void batchUpdateForStreamPushRedisMsg(List<CommonGBChannel> commonGBChannels, boolean permission) {
if (commonGBChannels.isEmpty()) {
log.warn("[更新多个通道] 通道数量为0更新失败");
return;
}
List<CommonGBChannel> oldCommonGBChannelList = commonGBChannelMapper.queryOldChanelListByChannels(commonGBChannels);
// 批量保存
int limitCount = 1000;
int result = 0;
if (commonGBChannels.size() > limitCount) {
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannels.size()) {
toIndex = commonGBChannels.size();
if (permission) {
// 批量保存
int limitCount = 1000;
int result = 0;
if (commonGBChannels.size() > limitCount) {
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannels.size()) {
toIndex = commonGBChannels.size();
}
result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex));
}
result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex));
} else {
result += commonGBChannelMapper.batchUpdate(commonGBChannels);
}
} else {
result += commonGBChannelMapper.batchUpdate(commonGBChannels);
log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
}
log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
// 发送通过更新通知
try {
// 发送通知

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
@ -270,14 +269,12 @@ public class GroupServiceImpl implements IGroupService {
if (businessGroupInDb == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "业务分组不存在");
}
List<Group> groupList = new LinkedList<>();
groupList.add(businessGroupInDb);
Group group = groupManager.queryOneByDeviceId(deviceId, businessGroup);
if (group == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "虚拟组织不存在");
}
List<Group> allParent = getAllParent(group);
groupList.addAll(allParent);
List<Group> groupList = new LinkedList<>(allParent);
groupList.add(group);
return groupList;
}
@ -287,10 +284,9 @@ public class GroupServiceImpl implements IGroupService {
return new ArrayList<>();
}
List<Group> groupList = new ArrayList<>();
Group parent = groupManager.queryOneByDeviceId(group.getParentDeviceId(), group.getBusinessGroup());
if (parent == null) {
return groupList;
return new ArrayList<>();
}
List<Group> allParent = getAllParent(parent);
allParent.add(parent);
@ -322,7 +318,6 @@ public class GroupServiceImpl implements IGroupService {
@Override
@Transactional
public void saveByAlias(Collection<Group> groups) {
log.info("[存储分组数据] {}", JSONObject.toJSONString(groups));
// 清空别名数据
groupManager.deleteHasAlias();
// 写入新数据

View File

@ -22,6 +22,7 @@ import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author lin
@ -62,16 +63,24 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
@EventListener
public void onApplicationEvent(ChannelEvent event) {
if (event.getChannels().isEmpty()) {
log.info("[国标级联-处理通道变化事件] 通道数量为空");
return;
}
String deviceIds = event.getChannels().stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[国标级联-处理通道变化事件] 类型: {}, 通道: {}", event.getMessageType(), deviceIds);
// 获取通道所关联的平台
List<Platform> allPlatform = platformMapper.queryByServerId(userSetting.getServerId());
if (allPlatform.isEmpty()) {
log.info("[国标级联-处理通道变化事件] 没有关联的平台");
return;
}
// 获取所用订阅
List<String> platforms = subscribeHolder.getAllCatalogSubscribePlatform(allPlatform);
Map<String, List<Platform>> platformMap = new HashMap<>();
Map<String, CommonGBChannel> channelMap = new HashMap<>();
if (platforms.isEmpty()) {
log.info("[国标级联-处理通道变化事件] 没有关联的平台的目录订阅");
return;
}
for (CommonGBChannel deviceChannel : event.getChannels()) {
@ -81,6 +90,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
channelMap.put(deviceChannel.getGbDeviceId(), deviceChannel);
}
if (platformMap.isEmpty()) {
log.info("[国标级联-处理通道变化事件] 开启订阅的平台都没有关联通道: {}", deviceIds);
return;
}
switch (event.getMessageType()) {
@ -147,13 +157,16 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
@EventListener
public void onApplicationEvent(CatalogEvent event) {
log.info("[Catalog事件: {}]通道数量: {}", event.getType(), event.getChannels().size());
String deviceIds = event.getChannels().stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[Catalog事件: {}] 通道: {}", event.getType(), deviceIds);
Platform platform = event.getPlatform();
if (platform == null || platform.getServerGBId() == null) {
log.info("[Catalog事件: {}] 缺少通道或通道数据异常: {}", event.getType(), deviceIds);
return;
}
SubscribeInfo subscribe = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
if (subscribe == null) {
log.info("[Catalog事件: {}] 平台未被目录订阅,取消发送: {}", event.getType(), deviceIds);
return;
}
switch (event.getType()) {
@ -165,7 +178,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
channels.addAll(event.getChannels());
}
if (!channels.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), channels.size());
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds);
try {
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
@ -185,7 +198,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
deviceChannelList.addAll(event.getChannels());
}
if (!deviceChannelList.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceChannelList.size());
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds);
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
@ -460,35 +473,40 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public int removeChannelList(Integer platformId, List<CommonGBChannel> channelList) {
Platform platform = platformMapper.query(platformId);
if (platform == null) {
log.info("[移除关联通道] 平台{}未查询到", platformId);
return 0;
}
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[移除关联通道] 上级平台: {} 通道: {}", platform.getServerGBId(), deviceIds);
int result = platformChannelMapper.removeChannelsWithPlatform(platformId, channelList);
if (result > 0) {
// 查询通道相关的分组信息
Set<Region> regionSet = regionMapper.queryByChannelList(channelList);
Set<Region> deleteRegion = deleteEmptyRegion(regionSet, platformId);
if (!deleteRegion.isEmpty()) {
for (Region region : deleteRegion) {
channelList.add(0, CommonGBChannel.build(region));
}
if (result <= 0) {
log.info("[移除关联通道] 平台{}未关联通道: {}", platformId, deviceIds);
return 0;
}
// 查询通道相关的分组信息
Set<Region> regionSet = regionMapper.queryByChannelList(channelList);
Set<Region> deleteRegion = deleteEmptyRegion(regionSet, platformId);
if (!deleteRegion.isEmpty()) {
for (Region region : deleteRegion) {
channelList.add(0, CommonGBChannel.build(region));
}
}
// 查询通道相关的分组信息
Set<Group> groupSet = groupMapper.queryByChannelList(channelList);
Set<Group> deleteGroup = deleteEmptyGroup(groupSet, platformId);
if (!deleteGroup.isEmpty()) {
for (Group group : deleteGroup) {
channelList.add(0, CommonGBChannel.build(group));
}
}
// 发送消息
try {
// 发送catalog
eventPublisher.catalogEventPublish(platform, channelList, CatalogEvent.DEL);
} catch (Exception e) {
log.warn("[移除关联通道] 发送失败,数量:{}", channelList.size(), e);
// 查询通道相关的分组信息
Set<Group> groupSet = groupMapper.queryByChannelList(channelList);
Set<Group> deleteGroup = deleteEmptyGroup(groupSet, platformId);
if (!deleteGroup.isEmpty()) {
for (Group group : deleteGroup) {
channelList.add(0, CommonGBChannel.build(group));
}
}
// 发送消息
try {
// 发送catalog
eventPublisher.catalogEventPublish(platform, channelList, CatalogEvent.DEL);
} catch (Exception e) {
log.warn("[移除关联通道] 发送失败,数量:{}", channelList.size(), e);
}
return result;
}
@ -497,6 +515,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public int removeChannels(Integer platformId, List<Integer> channelIds) {
List<CommonGBChannel> channelList = platformChannelMapper.queryShare(platformId, channelIds);
if (channelList.isEmpty()) {
log.info("[移除通道] 通道列表为空");
return 0;
}
return removeChannelList(platformId, channelList);
@ -507,6 +526,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public void removeChannels(List<Integer> ids) {
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(ids);
if (platformList.isEmpty()) {
log.info("[移除多个通道] 未查询到通道关联的平台");
return;
}
@ -520,6 +540,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public void removeChannel(int channelId) {
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelId(channelId);
if (platformList.isEmpty()) {
log.info("[移除多个通道] 未查询到通道:{} 关联的平台", channelId);
return;
}
for (Platform platform : platformList) {
@ -532,6 +553,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
@Override
public List<CommonGBChannel> queryByPlatform(Platform platform) {
if (platform == null) {
log.info("[查询通道所属平台] 平台参数为NULL");
return null;
}
List<CommonGBChannel> commonGBChannelList = commonGBChannelMapper.queryWithPlatform(platform.getId());
@ -570,6 +592,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
Assert.notNull(platform, "平台不存在");
List<CommonGBChannel> channelList = queryByPlatform(platform);
if (channelList.isEmpty()){
log.info("[推送通道] 平台:{} 未查询到通道信息", platform.getServerGBId());
return;
}
SubscribeInfo subscribeInfo = SubscribeInfo.buildSimulated(platform.getServerGBId(), platform.getServerIp());
@ -608,6 +631,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
// 获取关联这些通道的平台
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[获取关联这些通道的平台] 未查询到通道关联的平台, 通道如下 {}", deviceIds);
return;
}
for (Platform platform : platformList) {
@ -646,6 +671,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
// 获取关联这些通道的平台
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[获取关联这些通道的平台] 未查询到通道关联的平台, 通道如下 {}", deviceIds);
return;
}
for (Platform platform : platformList) {
@ -683,6 +710,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
});
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[获取关联这些通道的平台] 未查询到通道关联的平台, 通道如下 {}", deviceIds);
return;
}
for (Platform platform : platformList) {
@ -714,6 +743,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
});
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[获取关联这些通道的平台] 未查询到通道关联的平台, 通道如下 {}", deviceIds);
return;
}
for (Platform platform : platformList) {

View File

@ -236,8 +236,9 @@ public class RegionServiceImpl implements IRegionService {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "行政区划不存在");
}
List<Region> allParent = getAllParent(region);
allParent.add(region);
return allParent;
List<Region> regionList = new LinkedList<>(allParent);
regionList.add(region);
return regionList;
}
@ -246,15 +247,13 @@ public class RegionServiceImpl implements IRegionService {
return new ArrayList<>();
}
List<Region> regionList = new LinkedList<>();
Region parent = regionMapper.queryByDeviceId(region.getParentDeviceId());
if (parent == null) {
return regionList;
return new ArrayList<>();
}
regionList.add(parent);
List<Region> allParent = getAllParent(parent);
regionList.addAll(allParent);
return regionList;
allParent.add(parent);
return allParent;
}
@Override

View File

@ -671,13 +671,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public boolean stopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaInfo.getType());
public boolean stopSendRtp(MediaServer mediaServer, String app, String stream, String ssrc) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[stopSendRtp] 失败, mediaServer的类型 {},未找到对应的实现类", mediaInfo.getType());
log.info("[stopSendRtp] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return false;
}
return mediaNodeServerService.stopSendRtp(mediaInfo, app, stream, ssrc);
return mediaNodeServerService.stopSendRtp(mediaServer, app, stream, ssrc);
}
@Override

View File

@ -86,7 +86,6 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
/**
* 查询最新移动位置
* @param deviceId
*/
@Override
public MobilePosition queryLatestPosition(String deviceId) {

View File

@ -70,7 +70,7 @@ public class RedisGpsMsgListener implements MessageListener {
GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
gpsMsgInfo.setStored(false);
gpsMsgInfo.setTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(gpsMsgInfo.getTime()));
log.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
log.debug("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
// 只是放入redis缓存起来
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
} catch (Exception e) {

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
@ -10,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage;
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -35,10 +35,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class RedisGroupMsgListener implements MessageListener {
@Resource
@Autowired
private IGroupService groupService;
@Resource
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
@ -141,6 +141,7 @@ public class RedisGroupMsgListener implements MessageListener {
group.setUpdateTime(DateUtil.getNow());
aliasGroupToSave.put(group.getAlias(), group);
}
log.info("[业务分组同步回复-存储分组数据] {}", JSONObject.toJSONString(aliasGroupToSave.values()));
// 存储分组数据
groupService.saveByAlias(aliasGroupToSave.values());
@ -153,7 +154,6 @@ public class RedisGroupMsgListener implements MessageListener {
}
}
/**
@ -173,8 +173,8 @@ public class RedisGroupMsgListener implements MessageListener {
if (isTop) {
codeType = "215";
}
return String.format(deviceTemplate, codeType, RandomStringUtils.secureStrong().next(6, false, true));
}catch (Exception e) {
return String.format(deviceTemplate, codeType, RandomStringUtils.insecure().next(6, false, true));
} catch (Exception e) {
log.error("[REDIS消息-业务分组同步回复] 构建新的分组编号失败", e);
return null;
}

View File

@ -48,10 +48,6 @@ public class RedisPushStreamListMsgListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
String serverId = redisCatchStorage.chooseOneServer(null);
if (!userSetting.getServerId().equals(serverId)) {
return;
}
log.info("[REDIS: 推流设备列表更新] {}", new String(message.getBody()));
taskQueue.offer(message);
}
@ -130,7 +126,7 @@ public class RedisPushStreamListMsgListener implements MessageListener {
if (!streamPushItemForUpdate.isEmpty()) {
log.info("修改{}条", streamPushItemForUpdate.size());
log.info(JSONObject.toJSONString(streamPushItemForUpdate));
streamPushService.batchUpdate(streamPushItemForUpdate);
streamPushService.batchUpdateForRedisMsg(streamPushItemForUpdate);
}
} catch (Exception e) {
log.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(msg.getBody()));

View File

@ -79,19 +79,19 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
if (streamStatusMessage.isSetAllOffline()) {
// 所有设备离线
streamPushService.allOffline();
streamPushService.allOfflineForRedisMsg();
}
if (streamStatusMessage.getOfflineStreams() != null
&& !streamStatusMessage.getOfflineStreams().isEmpty()) {
// 更新部分设备离线
log.info("[REDIS: 推流设备状态变化] 更新部分设备离线: {}个", streamStatusMessage.getOfflineStreams().size());
streamPushService.offline(streamStatusMessage.getOfflineStreams());
streamPushService.offlineforRedisMsg(streamStatusMessage.getOfflineStreams());
}
if (streamStatusMessage.getOnlineStreams() != null &&
!streamStatusMessage.getOnlineStreams().isEmpty()) {
// 更新部分设备上线
log.info("[REDIS: 推流设备状态变化] 更新部分设备上线: {}个", streamStatusMessage.getOnlineStreams().size());
streamPushService.online(streamStatusMessage.getOnlineStreams());
streamPushService.onlineForRedisMsg(streamStatusMessage.getOnlineStreams());
}
} catch (Exception e) {
log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.parseObject(msg.getBody()));
@ -115,7 +115,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, () -> {
log.info("[REDIS消息]未收到redis回复推流设备状态执行推流设备离线");
// 五秒收不到请求就设置通道离线然后通知上级离线
streamPushService.allOffline();
streamPushService.allOfflineForRedisMsg();
}, 5000);
}

View File

@ -524,7 +524,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
if (serverId != null) {
redisTemplate.opsForZSet().remove(key, serverId);
}
Set<Object> range = redisTemplate.opsForZSet().range(key, 0, 0);
// 获取得分最高的也是最后更新时间到redis的wvp这样可以避免读取到离线的wvp同时时间最新也一定程度代表最健康的
Set<Object> range = redisTemplate.opsForZSet().reverseRange(key, 0, 0);
if (range == null || range.isEmpty()) {
return null;
}

View File

@ -289,7 +289,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (!channelListForOnline.isEmpty()) {
gbChannelService.online(channelListForOnline);
gbChannelService.online(channelListForOnline, true);
}
List<CommonGBChannel> channelListForOffline = new ArrayList<>();
List<StreamProxy> streamProxiesForRemove = new ArrayList<>();
@ -302,7 +302,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
}
if (!channelListForOffline.isEmpty()) {
gbChannelService.offline(channelListForOffline);
gbChannelService.offline(channelListForOffline, true);
}
if (!streamProxiesForRemove.isEmpty()) {
streamProxyMapper.deleteByList(streamProxiesForRemove);
@ -338,7 +338,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (!channelListForOffline.isEmpty()) {
// 修改国标关联的国标通道的状态
gbChannelService.offline(channelListForOffline);
gbChannelService.offline(channelListForOffline, true);
}
if (!streamProxiesForSendMessage.isEmpty()) {
for (StreamProxy streamProxy : streamProxiesForSendMessage) {

View File

@ -157,4 +157,9 @@ public interface StreamPushMapper {
"</foreach>" +
"</script>"})
int batchUpdate(List<StreamPush> streamPushItemForUpdate);
@Delete(" DELETE FROM wvp_stream_push st " +
" LEFT join wvp_device_channel wdc on wdc.data_type = 2 and st.id = wdc.data_device_id " +
" where wdc.id is null and st.server_id = #{serverId}")
void deleteWithoutGBId(@Param("serverId") String serverId);
}

View File

@ -52,17 +52,17 @@ public interface IStreamPushService {
/**
* 全部离线
*/
void allOffline();
void allOfflineForRedisMsg();
/**
* 推流离线
*/
void offline(List<StreamPushItemFromRedis> offlineStreams);
void offlineforRedisMsg(List<StreamPushItemFromRedis> offlineStreams);
/**
* 推流上线
*/
void online(List<StreamPushItemFromRedis> onlineStreams);
void onlineForRedisMsg(List<StreamPushItemFromRedis> onlineStreams);
/**
* 增加推流
@ -91,7 +91,7 @@ public interface IStreamPushService {
void updatePushStatus(StreamPush streamPush);
void batchUpdate(List<StreamPush> streamPushItemForUpdate);
void batchUpdateForRedisMsg(List<StreamPush> streamPushItemForUpdate);
int delete(int id);

View File

@ -287,6 +287,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
if (mediaServer != null) {
mediaServerService.closeStreams(mediaServer, streamPush.getApp(), streamPush.getStream());
mediaServerService.stopSendRtp(mediaServer, streamPush.getApp(), streamPush.getStream(), null);
}
streamPush.setPushing(false);
if (userSetting.getUsePushingAsStatus()) {
@ -296,7 +297,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
}
sendRtpServerService.deleteByStream(streamPush.getStream());
mediaServerService.stopSendRtp(mediaServer, streamPush.getApp(), streamPush.getStream(), null);
streamPush.setUpdateTime(DateUtil.getNow());
streamPushMapper.update(streamPush);
return true;
@ -383,6 +383,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServer.getId());
}
}
if (!pushItemMap.isEmpty()) {
for (StreamPush streamPush : pushItemMap.values()) {
// 如果没有国标编号从数据库中删除
delete(streamPush.getId());
}
}
Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
if (!streamAuthorityInfos.isEmpty()) {
@ -402,11 +408,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
stop(streamPushItem);
}
}
// // 移除没有GBId的推流
// streamPushMapper.deleteWithoutGBId(mediaServerId);
// // 其他的流设置未启用
// streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
// streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
// 移除没有GBId的推流
streamPushMapper.deleteWithoutGBId(mediaServer.getId());
// 发送流停止消息
String type = "PUSH";
// 发送redis消息
@ -443,7 +446,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
@Override
public void allOffline() {
public void allOfflineForRedisMsg() {
String serverId = redisCatchStorage.chooseOneServer(null);
boolean permission = userSetting.getServerId().equals(serverId);
List<StreamPush> streamPushList = streamPushMapper.selectAll(null, null, null);
if (streamPushList.isEmpty()) {
return;
@ -455,11 +460,13 @@ public class StreamPushServiceImpl implements IStreamPushService {
commonGBChannelList.add(streamPush.buildCommonGBChannel());
}
}
gbChannelService.offline(commonGBChannelList);
gbChannelService.offline(commonGBChannelList, permission);
}
@Override
public void offline(List<StreamPushItemFromRedis> offlineStreams) {
public void offlineforRedisMsg(List<StreamPushItemFromRedis> offlineStreams) {
String serverId = redisCatchStorage.chooseOneServer(null);
boolean permission = userSetting.getServerId().equals(serverId);
// 更新部分设备离线
List<StreamPush> streamPushList = streamPushMapper.getListInList(offlineStreams);
if (streamPushList.isEmpty()) {
@ -467,15 +474,17 @@ public class StreamPushServiceImpl implements IStreamPushService {
return;
}
List<CommonGBChannel> commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList);
gbChannelService.offline(commonGBChannelList);
gbChannelService.offline(commonGBChannelList, permission);
}
@Override
public void online(List<StreamPushItemFromRedis> onlineStreams) {
public void onlineForRedisMsg(List<StreamPushItemFromRedis> onlineStreams) {
if (onlineStreams.isEmpty()) {
log.info("[设备上线] 推流设备列表为空");
return;
}
String serverId = redisCatchStorage.chooseOneServer(null);
boolean permission = userSetting.getServerId().equals(serverId);
// 更新部分设备上线streamPushService
List<StreamPush> streamPushList = streamPushMapper.getListInList(onlineStreams);
if (streamPushList.isEmpty()) {
@ -485,7 +494,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
return;
}
List<CommonGBChannel> commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList);
gbChannelService.online(commonGBChannelList);
gbChannelService.online(commonGBChannelList, permission);
}
@Override
@ -553,15 +562,19 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
@Transactional
public void batchUpdate(List<StreamPush> streamPushItemForUpdate) {
streamPushMapper.batchUpdate(streamPushItemForUpdate);
public void batchUpdateForRedisMsg(List<StreamPush> streamPushItemForUpdate) {
String serverId = redisCatchStorage.chooseOneServer(null);
boolean permission = userSetting.getServerId().equals(serverId);
if (permission) {
streamPushMapper.batchUpdate(streamPushItemForUpdate);
}
List<CommonGBChannel> commonGBChannels = new ArrayList<>();
for (StreamPush streamPush : streamPushItemForUpdate) {
if (!ObjectUtils.isEmpty(streamPush.getGbDeviceId())) {
commonGBChannels.add(streamPush.buildCommonGBChannel());
}
}
gbChannelService.batchUpdate(commonGBChannels);
gbChannelService.batchUpdateForStreamPushRedisMsg(commonGBChannels, permission);
}
@Override

View File

@ -626,5 +626,11 @@ public class CameraChannelController {
return channelService.queryMeetingChannelList(topGroupAlias);
}
@GetMapping(value = "/test")
@ResponseBody
public SYMember test(String device){
return channelService.getMember(device);
}
}

View File

@ -8,9 +8,10 @@ import lombok.Setter;
public class SYMember {
private String no;
private String unicodeNo;
private String blockId;
private Long unicodeNo;
private Long blockId;
private String unitNo;
private String terminalMemberStatus;
private String channelDeviceId;
}

View File

@ -42,6 +42,7 @@ public class CameraChannelService implements CommandLineRunner {
private final String REDIS_CHANNEL_MESSAGE = "VM_MSG_MOBILE_CHANNEL";
private final String REDIS_MEMBER_STATUS_MESSAGE = "VM_MSG_MEMBER_STATUS_CHANNEL";
private final String MOBILE_CHANNEL_PREFIX = "nationalStandardMobileTerminal_";
private final String DELAY_TASK_KEY = "DELAY_TASK_KEY_";
@Autowired
private CommonGBChannelMapper channelMapper;
@ -126,9 +127,9 @@ public class CameraChannelService implements CommandLineRunner {
List<CommonGBChannel> resultListForOnline = new ArrayList<>();
List<CommonGBChannel> resultListForOffline = new ArrayList<>();
List<SYMember> memberList = new ArrayList<>();
List<CommonGBChannel> addMemberList = new ArrayList<>();
Map<String, CommonGBChannel> delayChannelMap = new HashMap<>();
List<SYMember> memberList = new ArrayList<>();
switch (event.getMessageType()) {
@ -157,16 +158,29 @@ public class CameraChannelService implements CommandLineRunner {
if (oldChannel != null) {
if (oldChannel.getGbPtzType() != null && oldChannel.getGbPtzType() == 99) {
resultListForUpdate.add(channel);
// 如果状态变化发送消息
if (!Objects.equals(oldChannel.getGbStatus(), channel.getGbStatus())) {
SYMember member = getMember(channel.getGbDeviceId());
if (member != null) {
if ("ON".equals(channel.getGbStatus())) {
member.setTerminalMemberStatus("ONLINE");
}else {
member.setTerminalMemberStatus("OFFLINE");
}
memberList.add(member);
}
}
}else {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
addMemberList.add(channel);
delayChannelMap.put(channel.getGbDeviceId(), channel);
}
}
}else {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
addMemberList.add(channel);
delayChannelMap.put(channel.getGbDeviceId(), channel);
}
}
}else {
@ -175,6 +189,11 @@ public class CameraChannelService implements CommandLineRunner {
CameraChannel cameraChannel = new CameraChannel();
cameraChannel.setGbDeviceId(channel.getGbDeviceId());
resultListForDelete.add(cameraChannel);
SYMember member = getMember(cameraChannel.getGbDeviceId());
if (member != null) {
member.setTerminalMemberStatus("OFFLINE");
memberList.add(member);
}
}
}
}
@ -186,8 +205,14 @@ public class CameraChannelService implements CommandLineRunner {
CameraChannel cameraChannel = new CameraChannel();
cameraChannel.setGbDeviceId(channel.getGbDeviceId());
resultListForDelete.add(cameraChannel);
SYMember member = getMember(cameraChannel.getGbDeviceId());
if (member != null) {
member.setTerminalMemberStatus("OFFLINE");
memberList.add(member);
}
}
}
break;
case ON:
case OFF:
@ -221,7 +246,7 @@ public class CameraChannelService implements CommandLineRunner {
if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
addMemberList.add(channel);
delayChannelMap.put(channel.getGbDeviceId(), channel);
}
}
}
@ -232,7 +257,7 @@ public class CameraChannelService implements CommandLineRunner {
jsonObject.put("type", ChannelEvent.ChannelEventMessageType.DEL);
jsonObject.put("list", resultListForDelete);
log.info("[SY-redis发送通知-DEL] 发送 通道信息变化 {}: {}", REDIS_CHANNEL_MESSAGE, jsonObject.toString());
redisTemplate.convertAndSend(REDIS_CHANNEL_MESSAGE, jsonObject);
redisTemplateForString.convertAndSend(REDIS_CHANNEL_MESSAGE, jsonObject.toString());
}
if (!resultListForAdd.isEmpty()) {
sendChannelMessage(resultListForAdd, ChannelEvent.ChannelEventMessageType.ADD);
@ -249,34 +274,42 @@ public class CameraChannelService implements CommandLineRunner {
if (!memberList.isEmpty()) {
sendMemberStatusMessage(memberList);
}
if (!addMemberList.isEmpty()) {
if (!delayChannelMap.isEmpty()) {
// 对于在线的终端进行延迟检查和发送
String key = UUID.randomUUID().toString();
dynamicTask.startDelay(key, () -> {
List<SYMember> members = new ArrayList<>();
for (CommonGBChannel commonGBChannel : addMemberList) {
for (CommonGBChannel commonGBChannel : delayChannelMap.values()) {
String key = DELAY_TASK_KEY + commonGBChannel.getGbDeviceId();
dynamicTask.startDelay(key, () -> {
dynamicTask.stop(key);
SYMember member = getMember(commonGBChannel.getGbDeviceId());
if (member == null) {
continue;
return;
}
member.setTerminalMemberStatus("ONLINE");
members.add(member);
}
if (!members.isEmpty()) {
sendMemberStatusMessage(members);
}
}, 5000);
sendMemberStatusMessage(List.of(member));
}, 3000);
}
}
}
private void sendMemberStatusMessage(List<SYMember> memberList) {
// 取消延时发送
for (SYMember syMember : memberList) {
String key = DELAY_TASK_KEY + syMember.getChannelDeviceId();
if (dynamicTask.contains(key)) {
log.info("[SY-redis发送通知] 取消延时新增任务: {}", key);
dynamicTask.stop(key);
}
}
String jsonString = JSONObject.toJSONString(memberList);
log.info("[SY-redis发送通知] 发送 状态变化 {}: {}", REDIS_MEMBER_STATUS_MESSAGE, jsonString);
redisTemplate.convertAndSend(REDIS_MEMBER_STATUS_MESSAGE, jsonString);
redisTemplateForString.convertAndSend(REDIS_MEMBER_STATUS_MESSAGE, jsonString);
}
private void sendChannelMessage(List<CommonGBChannel> channelList, ChannelEvent.ChannelEventMessageType type) {
if (channelList.isEmpty()) {
log.warn("[SY-redis发送通知-{}] 发送失败,数据为空, 通道信息变化 {}", type, REDIS_CHANNEL_MESSAGE);
return;
}
List<CameraChannel> cameraChannelList = channelMapper.queryCameraChannelByIds(channelList);
@ -284,7 +317,7 @@ public class CameraChannelService implements CommandLineRunner {
jsonObject.put("type", type);
jsonObject.put("list", cameraChannelList);
log.info("[SY-redis发送通知-{}] 发送 通道信息变化 {}: {}", type, REDIS_CHANNEL_MESSAGE, jsonObject.toString());
redisTemplate.convertAndSend(REDIS_CHANNEL_MESSAGE, jsonObject);
redisTemplateForString.convertAndSend(REDIS_CHANNEL_MESSAGE, jsonObject.toString());
}
// 监听GPS消息如果是移动设备则发送redis消息
@ -311,18 +344,21 @@ public class CameraChannelService implements CommandLineRunner {
jsonObject.put("direction", mobilePosition.getDirection());
jsonObject.put("speed", mobilePosition.getSpeed());
jsonObject.put("blockId", member.getBlockId());
jsonObject.put("gbDeviceId", mobilePosition.getChannelDeviceId());
log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString());
redisTemplate.convertAndSend(REDIS_GPS_MESSAGE, jsonObject);
redisTemplateForString.convertAndSend(REDIS_GPS_MESSAGE, jsonObject.toString());
}
private SYMember getMember(String deviceId) {
public SYMember getMember(String deviceId) {
// 从redis补充信息
String key = MOBILE_CHANNEL_PREFIX + deviceId;
String memberJsonString = (String) redisTemplate.opsForValue().get(key);
if (memberJsonString == null) {
JSONObject jsonObject = (JSONObject)redisTemplate.opsForValue().get(key);
if (jsonObject == null) {
return null;
}
return JSONObject.parseObject(memberJsonString, SYMember.class);
SYMember syMember = JSONObject.parseObject(jsonObject.toString(), SYMember.class);
syMember.setChannelDeviceId(deviceId);
return syMember;
}

View File

@ -861,7 +861,7 @@
320623,如东县,3206
320681,启东市,3206
320682,如皋市,3206
320684,海门,3206
320684,海门,3206
320685,海安市,3206
3207,连云港市,32
320703,连云区,3207
@ -918,8 +918,6 @@
33,浙江省,
3301,杭州市,33
330102,上城区,3301
330103,下城区,3301
330104,江干区,3301
330105,拱墅区,3301
330106,西湖区,3301
330108,滨江区,3301
@ -927,6 +925,8 @@
330110,余杭区,3301
330111,富阳区,3301
330112,临安区,3301
330113,临平区,3301
330114,钱塘区,3301
330122,桐庐县,3301
330127,淳安县,3301
330182,建德市,3301

1 编号 名称 上级
861 320623 如东县 3206
862 320681 启东市 3206
863 320682 如皋市 3206
864 320684 海门市 海门区 3206
865 320685 海安市 3206
866 3207 连云港市 32
867 320703 连云区 3207
918 33 浙江省
919 3301 杭州市 33
920 330102 上城区 3301
330103 下城区 3301
330104 江干区 3301
921 330105 拱墅区 3301
922 330106 西湖区 3301
923 330108 滨江区 3301
925 330110 余杭区 3301
926 330111 富阳区 3301
927 330112 临安区 3301
928 330113 临平区 3301
929 330114 钱塘区 3301
930 330122 桐庐县 3301
931 330127 淳安县 3301
932 330182 建德市 3301

View File

@ -30,11 +30,14 @@
</el-form-item>
<el-form-item label="行政区域">
<el-input v-model="form.gbCivilCode" placeholder="请输入行政区域">
<el-input v-model="form.gbCivilCode" placeholder="请输入行政区域" @change="getRegionPaths">
<template v-slot:append>
<el-button @click="chooseCivilCode()">选择</el-button>
</template>
</el-input>
<el-breadcrumb v-if="regionPath.length > 0" separator="/" style="display: block; margin-top: 8px; font-size: 14px;">
<el-breadcrumb-item v-for="key in regionPath" :key="key">{{ key }}</el-breadcrumb-item>
</el-breadcrumb>
</el-form-item>
<el-form-item label="安装地址">
@ -274,6 +277,7 @@ export default {
loading: false,
modelList: [],
parentPath: [],
regionPath: [],
form: {}
}
},
@ -387,6 +391,7 @@ export default {
this.form = data
this.$set(this.form, 'enableBroadcastForBool', this.form.enableBroadcast === 1)
this.getPaths()
this.getRegionPaths()
})
.finally(() => {
this.loading = false
@ -400,6 +405,7 @@ export default {
chooseCivilCode: function() {
this.$refs.chooseCivilCode.openDialog(code => {
this.form.gbCivilCode = code
this.getRegionPaths()
})
},
chooseGroup: function() {
@ -431,6 +437,20 @@ export default {
this.parentPath = path
})
}
},
getRegionPaths: function() {
this.regionPath = []
if (this.form.gbCivilCode) {
this.$store.dispatch('region/queryPath', this.form.gbCivilCode)
.then(data => {
console.log(data)
const path = []
for (let i = 0; i < data.length; i++) {
path.push(data[i].name)
}
this.regionPath = path
})
}
}
}
}

View File

@ -208,7 +208,7 @@ export default {
},
mounted() {
this.initData()
this.updateLooper = setInterval(this.initData, 10000)
this.updateLooper = setInterval(this.getList, 10000)
},
destroyed() {
this.$destroy('videojs')
@ -216,6 +216,8 @@ export default {
},
methods: {
initData: function() {
this.currentPage = 1
this.total = 0
this.getList()
},
currentChange: function(val) {

View File

@ -10,7 +10,7 @@
placeholder="关键字"
prefix-icon="el-icon-search"
clearable
@input="getPlatformList"
@input="queryList"
/>
</el-form-item>
<el-form-item>
@ -292,6 +292,11 @@ export default {
this.count = val
this.getPlatformList()
},
queryList: function() {
this.currentPage = 1
this.total = 0
this.getPlatformList()
},
getPlatformList: function() {
this.$store.dispatch('platform/query', {
count: this.count,

View File

@ -9,7 +9,7 @@
placeholder="关键字"
prefix-icon="el-icon-search"
clearable
@input="getStreamProxyList"
@input="queryList"
/>
</el-form-item>
<el-form-item label="流媒体">
@ -18,7 +18,7 @@
style="margin-right: 1rem;"
placeholder="请选择"
default-first-option
@change="getStreamProxyList"
@change="queryList"
>
<el-option label="全部" value="" />
<el-option
@ -35,7 +35,7 @@
style="margin-right: 1rem;"
placeholder="请选择"
default-first-option
@change="getStreamProxyList"
@change="queryList"
>
<el-option label="全部" value="" />
<el-option label="正在拉流" value="true" />
@ -189,6 +189,11 @@ export default {
this.count = val
this.getStreamProxyList()
},
queryList: function() {
this.currentPage = 1
this.total = 0
this.getStreamProxyList()
},
getStreamProxyList: function() {
this.$store.dispatch('streamProxy/queryList', {
page: this.currentPage,

View File

@ -9,7 +9,7 @@
placeholder="关键字"
prefix-icon="el-icon-search"
clearable
@input="getPushList"
@input="queryList"
/>
</el-form-item>
<el-form-item label="流媒体">
@ -18,7 +18,7 @@
style="margin-right: 1rem;"
placeholder="请选择"
default-first-option
@change="getPushList"
@change="queryList"
>
<el-option label="全部" value="" />
<el-option
@ -35,7 +35,7 @@
style="margin-right: 1rem;"
placeholder="请选择"
default-first-option
@change="getPushList"
@change="queryList"
>
<el-option label="全部" value="" />
<el-option label="推流中" value="true" />
@ -206,6 +206,11 @@ export default {
this.count = val
this.getPushList()
},
queryList: function() {
this.currentPage = 1
this.total = 0
this.getPushList()
},
getPushList: function() {
this.$store.dispatch('streamPush/queryList', {
page: this.currentPage,