Compare commits

..

1 Commits

Author SHA1 Message Date
阿斌
2bb293c9d1
Pre Merge pull request !36 from 阿斌/N/A 2025-11-17 08:14:45 +00:00
31 changed files with 182 additions and 380 deletions

View File

@ -396,56 +396,4 @@ public class CommonGBChannel {
return commonGBChannel;
}
@Override
public String toString() {
return "CommonGBChannel{" +
"gbId=" + gbId +
", gbDeviceId='" + gbDeviceId + '\'' +
", gbName='" + gbName + '\'' +
", gbManufacturer='" + gbManufacturer + '\'' +
", gbModel='" + gbModel + '\'' +
", gbOwner='" + gbOwner + '\'' +
", gbCivilCode='" + gbCivilCode + '\'' +
", gbBlock='" + gbBlock + '\'' +
", gbAddress='" + gbAddress + '\'' +
", gbParental=" + gbParental +
", gbParentId='" + gbParentId + '\'' +
", gbSafetyWay=" + gbSafetyWay +
", gbRegisterWay=" + gbRegisterWay +
", gbCertNum='" + gbCertNum + '\'' +
", gbCertifiable=" + gbCertifiable +
", gbErrCode=" + gbErrCode +
", gbEndTime='" + gbEndTime + '\'' +
", gbSecrecy=" + gbSecrecy +
", gbIpAddress='" + gbIpAddress + '\'' +
", gbPort=" + gbPort +
", gbPassword='" + gbPassword + '\'' +
", gbStatus='" + gbStatus + '\'' +
", gbLongitude=" + gbLongitude +
", gbLatitude=" + gbLatitude +
", gpsAltitude=" + gpsAltitude +
", gpsSpeed=" + gpsSpeed +
", gpsDirection=" + gpsDirection +
", gpsTime='" + gpsTime + '\'' +
", gbBusinessGroupId='" + gbBusinessGroupId + '\'' +
", gbPtzType=" + gbPtzType +
", gbPositionType=" + gbPositionType +
", gbRoomType=" + gbRoomType +
", gbUseType=" + gbUseType +
", gbSupplyLightType=" + gbSupplyLightType +
", gbDirectionType=" + gbDirectionType +
", gbResolution='" + gbResolution + '\'' +
", gbDownloadSpeed='" + gbDownloadSpeed + '\'' +
", gbSvcSpaceSupportMod=" + gbSvcSpaceSupportMod +
", gbSvcTimeSupportMode=" + gbSvcTimeSupportMode +
", recordPLan=" + recordPLan +
", dataType=" + dataType +
", dataDeviceId=" + dataDeviceId +
", createTime='" + createTime + '\'' +
", updateTime='" + updateTime + '\'' +
", streamId='" + streamId + '\'' +
", enableBroadcast=" + enableBroadcast +
", mapLevel=" + mapLevel +
'}';
}
}

View File

@ -32,8 +32,7 @@ public class SubscribeHolder {
public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
log.info("[国标级联] 添加目录订阅,平台: {} 有效期: {}", platformId, subscribeInfo.getExpires());
subscribeInfo.setServerId(userSetting.getServerId());
String key = String.format("%s:%s:%s", prefix, "catalog", platformId);
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
if (subscribeInfo.getExpires() > 0) {
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
@ -43,19 +42,18 @@ public class SubscribeHolder {
}
public SubscribeInfo getCatalogSubscribe(String platformId) {
String key = String.format("%s:%s:%s", prefix, "catalog", platformId);
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
return (SubscribeInfo)redisTemplate.opsForValue().get(key);
}
public void removeCatalogSubscribe(String platformId) {
String key = String.format("%s:%s:%s", prefix, "catalog", platformId);
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
redisTemplate.delete(key);
}
public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) {
log.info("[国标级联] 添加移动位置订阅,平台: {} 有效期: {}s", platformId, subscribeInfo.getExpires());
subscribeInfo.setServerId(userSetting.getServerId());
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId);
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
if (subscribeInfo.getExpires() > 0) {
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
@ -83,12 +81,12 @@ public class SubscribeHolder {
}
public SubscribeInfo getMobilePositionSubscribe(String platformId) {
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId);
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
return (SubscribeInfo)redisTemplate.opsForValue().get(key);
}
public void removeMobilePositionSubscribe(String platformId) {
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId);
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
redisTemplate.delete(key);
}
@ -98,7 +96,7 @@ public class SubscribeHolder {
}
List<String> result = new ArrayList<>();
for (Platform platform : platformList) {
String key = String.format("%s:%s:%s", prefix, "catalog", platform.getServerGBId());
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerGBId());
}
@ -112,7 +110,7 @@ public class SubscribeHolder {
}
List<String> result = new ArrayList<>();
for (Platform platform : platformList) {
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId());
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerGBId());
}

View File

@ -37,11 +37,6 @@ public class SubscribeInfo {
*/
private String simulatedCallId;
/**
* 来源serverId
*/
private String serverId;
public static SubscribeInfo getInstance(SIPResponse response, String id, int expires, EventHeader eventHeader){
SubscribeInfo subscribeInfo = new SubscribeInfo();

View File

@ -22,13 +22,13 @@ public interface DeviceChannelMapper {
"insert into wvp_device_channel " +
"(device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"ip_address, port, password, status, longitude, latitude, gb_longitude, gb_latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type) " +
"values " +
"(#{deviceId}, #{dataType}, #{dataDeviceId}, #{name}, #{manufacturer}, #{model}, #{owner}, #{civilCode}, #{block}, " +
"#{address}, #{parental}, #{parentId}, #{safetyWay}, #{registerWay}, #{certNum}, #{certifiable}, #{errCode}, #{endTime}, #{secrecy}, " +
"#{ipAddress}, #{port}, #{password}, #{status}, #{longitude}, #{latitude}, #{ptzType}, #{positionType}, #{roomType}, #{useType}, " +
"#{ipAddress}, #{port}, #{password}, #{status}, #{longitude}, #{latitude}, #{gbLongitude}, #{gbLatitude}, #{ptzType}, #{positionType}, #{roomType}, #{useType}, " +
"#{supplyLightType}, #{directionType}, #{resolution}, #{businessGroupId}, #{downloadSpeed}, #{svcSpaceSupportMod}," +
" #{svcTimeSupportMode}, #{createTime}, #{updateTime}, #{subCount}, #{streamId}, #{hasAudio}, #{gpsTime}, #{streamIdentification}, #{channelType}) " +
"</script>")
@ -63,6 +63,8 @@ public interface DeviceChannelMapper {
", status=#{status}" +
", longitude=#{longitude}" +
", latitude=#{latitude}" +
", gb_longitude=#{gbLongitude}" +
", gb_latitude=#{gbLatitude}" +
", ptz_type=#{ptzType}" +
", position_type=#{positionType}" +
", room_type=#{roomType}" +
@ -198,14 +200,14 @@ public interface DeviceChannelMapper {
"insert into wvp_device_channel " +
"(device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"ip_address, port, password, status, longitude, latitude, gb_longitude, gb_latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type) " +
"values " +
"<foreach collection='addChannels' index='index' item='item' separator=','> " +
"(#{item.deviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " +
"#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.gbLongitude}, #{item.gbLatitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," +
" #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType}) " +
"</foreach> " +
@ -532,6 +534,8 @@ public interface DeviceChannelMapper {
", status=#{status}" +
", longitude=#{longitude}" +
", latitude=#{latitude}" +
", gb_longitude=#{gbLongitude}" +
", gb_latitude=#{gbLatitude}" +
", ptz_type=#{ptzType}" +
", position_type=#{positionType}" +
", room_type=#{roomType}" +

View File

@ -328,26 +328,11 @@ public interface GroupMapper {
@Delete("DELETE FROM wvp_common_group where alias is not null")
void deleteHasAlias();
@Update(value = " UPDATE wvp_common_group g1" +
@Update(" UPDATE wvp_common_group g1" +
" JOIN wvp_common_group g2" +
" ON g1.parent_device_id = g2.device_id" +
" SET g1.parent_id = g2.id" +
" WHERE g1.alias IS NOT NULL;", databaseId = "mysql")
@Update(value = " UPDATE wvp_common_group g1" +
" JOIN wvp_common_group g2" +
" ON g1.parent_device_id = g2.device_id" +
" SET g1.parent_id = g2.id" +
" WHERE g1.alias IS NOT NULL;", databaseId = "h2")
@Update(value = " UPDATE wvp_common_group AS g1" +
" SET parent_id = g2.id" +
" FROM wvp_common_group AS g2" +
" WHERE g1.parent_device_id = g2.device_id" +
" AND g1.alias IS NOT NULL;", databaseId = "kingbase")
@Update(value = " UPDATE wvp_common_group AS g1" +
" SET parent_id = g2.id" +
" FROM wvp_common_group AS g2" +
" WHERE g1.parent_device_id = g2.device_id" +
" AND g1.alias IS NOT NULL;", databaseId = "postgresql")
" WHERE g1.alias IS NOT NULL;")
void fixParentId();
}

View File

@ -295,7 +295,6 @@ public class ChannelProvider {
if (params.get("parentDeviceId") != null) {
sqlBuild.append(" AND coalesce(gb_parent_id, parent_id) = #{parentDeviceId}");
}
sqlBuild.append(" order by create_time desc");
return sqlBuild.toString();
}

View File

@ -65,19 +65,16 @@ public class EventPublisher {
}
public void channelEventPublishForUpdate(CommonGBChannel commonGBChannel, CommonGBChannel deviceChannelForOld) {
log.info("[通道改变内部分发-更新] {}", commonGBChannel.getGbDeviceId());
ChannelEvent channelEvent = ChannelEvent.getInstanceForUpdate(this, Collections.singletonList(commonGBChannel), Collections.singletonList(deviceChannelForOld));
applicationEventPublisher.publishEvent(channelEvent);
}
public void channelEventPublishForUpdate(List<CommonGBChannel> channelList, List<CommonGBChannel> channelListForOld) {
log.info("[通道改变内部分发-更新] 数量: {}", channelList.size());
ChannelEvent channelEvent = ChannelEvent.getInstanceForUpdate(this, channelList, channelListForOld);
applicationEventPublisher.publishEvent(channelEvent);
}
public void channelEventPublish(List<CommonGBChannel> channelList, ChannelEvent.ChannelEventMessageType type) {
log.info("[通道改变内部分发-{}] 数量: {}", type, channelList.size());
ChannelEvent channelEvent = ChannelEvent.getInstance(this, type, channelList);
applicationEventPublisher.publishEvent(channelEvent);
}

View File

@ -24,11 +24,11 @@ public interface IGbChannelService {
int offline(CommonGBChannel commonGBChannel);
int offline(List<CommonGBChannel> commonGBChannelList, boolean permission);
int offline(List<CommonGBChannel> commonGBChannelList);
int online(CommonGBChannel commonGBChannel);
int online(List<CommonGBChannel> commonGBChannelList, boolean permission);
int online(List<CommonGBChannel> commonGBChannelList);
void batchAdd(List<CommonGBChannel> commonGBChannels);
@ -78,7 +78,7 @@ public interface IGbChannelService {
void deleteChannelToGroupByGbDevice(List<Integer> deviceIds);
void batchUpdateForStreamPushRedisMsg(List<CommonGBChannel> commonGBChannels, boolean permission);
void batchUpdate(List<CommonGBChannel> commonGBChannels);
CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId);

View File

@ -187,7 +187,7 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
@Override
public int update(CommonGBChannel commonGBChannel) {
log.info("[更新通道] 通道ID: {}, ", commonGBChannel.toString());
log.info("[更新通道] 通道ID: {}, ", commonGBChannel.getGbId());
if (commonGBChannel.getGbId() <= 0) {
log.warn("[更新通道] 未找到数据库ID更新失败 {}({})", commonGBChannel.getGbName(), commonGBChannel.getGbDeviceId());
return 0;
@ -248,33 +248,32 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
@Override
@Transactional
public int offline(List<CommonGBChannel> commonGBChannelList, boolean permission) {
public int offline(List<CommonGBChannel> commonGBChannelList) {
if (commonGBChannelList.isEmpty()) {
log.warn("[多个通道离线] 通道数量为0更新失败");
return 0;
}
log.info("[通道离线] 共 {} 个", commonGBChannelList.size());
int limitCount = 1000;
int result = 0;
if (permission) {
int limitCount = 1000;
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF");
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF");
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF");
}
log.info("[通道离线] 保存入库 共 {} 个改变", result);
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF");
}
try {
// 发送catalog
eventPublisher.channelEventPublish(commonGBChannelList, ChannelEvent.ChannelEventMessageType.OFF);
} catch (Exception e) {
log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e);
if (result > 0) {
try {
// 发送catalog
eventPublisher.channelEventPublish(commonGBChannelList, ChannelEvent.ChannelEventMessageType.OFF);
} catch (Exception e) {
log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e);
}
}
return result;
}
@ -299,26 +298,24 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
@Override
@Transactional
public int online(List<CommonGBChannel> commonGBChannelList, boolean permission) {
public int online(List<CommonGBChannel> commonGBChannelList) {
if (commonGBChannelList.isEmpty()) {
log.warn("[多个通道上线] 通道数量为0更新失败");
return 0;
}
// 批量更新
int limitCount = 1000;
int result = 0;
if (permission) {
// 批量更新
int limitCount = 1000;
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON");
if (commonGBChannelList.size() > limitCount) {
for (int i = 0; i < commonGBChannelList.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannelList.size()) {
toIndex = commonGBChannelList.size();
}
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON");
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON");
}
} else {
result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON");
}
try {
// 发送catalog
@ -361,29 +358,27 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
}
@Override
public void batchUpdateForStreamPushRedisMsg(List<CommonGBChannel> commonGBChannels, boolean permission) {
public void batchUpdate(List<CommonGBChannel> commonGBChannels) {
if (commonGBChannels.isEmpty()) {
log.warn("[更新多个通道] 通道数量为0更新失败");
return;
}
List<CommonGBChannel> oldCommonGBChannelList = commonGBChannelMapper.queryOldChanelListByChannels(commonGBChannels);
if (permission) {
// 批量保存
int limitCount = 1000;
int result = 0;
if (commonGBChannels.size() > limitCount) {
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannels.size()) {
toIndex = commonGBChannels.size();
}
result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex));
// 批量保存
int limitCount = 1000;
int result = 0;
if (commonGBChannels.size() > limitCount) {
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > commonGBChannels.size()) {
toIndex = commonGBChannels.size();
}
} else {
result += commonGBChannelMapper.batchUpdate(commonGBChannels);
result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex));
}
log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
} else {
result += commonGBChannelMapper.batchUpdate(commonGBChannels);
}
log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
// 发送通过更新通知
try {
// 发送通知

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
@ -269,12 +270,14 @@ public class GroupServiceImpl implements IGroupService {
if (businessGroupInDb == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "业务分组不存在");
}
List<Group> groupList = new LinkedList<>();
groupList.add(businessGroupInDb);
Group group = groupManager.queryOneByDeviceId(deviceId, businessGroup);
if (group == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "虚拟组织不存在");
}
List<Group> allParent = getAllParent(group);
List<Group> groupList = new LinkedList<>(allParent);
groupList.addAll(allParent);
groupList.add(group);
return groupList;
}
@ -284,9 +287,10 @@ public class GroupServiceImpl implements IGroupService {
return new ArrayList<>();
}
List<Group> groupList = new ArrayList<>();
Group parent = groupManager.queryOneByDeviceId(group.getParentDeviceId(), group.getBusinessGroup());
if (parent == null) {
return new ArrayList<>();
return groupList;
}
List<Group> allParent = getAllParent(parent);
allParent.add(parent);
@ -318,6 +322,7 @@ public class GroupServiceImpl implements IGroupService {
@Override
@Transactional
public void saveByAlias(Collection<Group> groups) {
log.info("[存储分组数据] {}", JSONObject.toJSONString(groups));
// 清空别名数据
groupManager.deleteHasAlias();
// 写入新数据

View File

@ -22,7 +22,6 @@ import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author lin
@ -63,24 +62,16 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
@EventListener
public void onApplicationEvent(ChannelEvent event) {
if (event.getChannels().isEmpty()) {
log.info("[国标级联-处理通道变化事件] 通道数量为空");
return;
}
String deviceIds = event.getChannels().stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[国标级联-处理通道变化事件] 类型: {}, 通道: {}", event.getMessageType(), deviceIds);
// 获取通道所关联的平台
List<Platform> allPlatform = platformMapper.queryByServerId(userSetting.getServerId());
if (allPlatform.isEmpty()) {
log.info("[国标级联-处理通道变化事件] 没有关联的平台");
return;
}
// 获取所用订阅
List<String> platforms = subscribeHolder.getAllCatalogSubscribePlatform(allPlatform);
Map<String, List<Platform>> platformMap = new HashMap<>();
Map<String, CommonGBChannel> channelMap = new HashMap<>();
if (platforms.isEmpty()) {
log.info("[国标级联-处理通道变化事件] 没有关联的平台的目录订阅");
return;
}
for (CommonGBChannel deviceChannel : event.getChannels()) {
@ -90,7 +81,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
channelMap.put(deviceChannel.getGbDeviceId(), deviceChannel);
}
if (platformMap.isEmpty()) {
log.info("[国标级联-处理通道变化事件] 开启订阅的平台都没有关联通道: {}", deviceIds);
return;
}
switch (event.getMessageType()) {
@ -157,16 +147,13 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
@EventListener
public void onApplicationEvent(CatalogEvent event) {
String deviceIds = event.getChannels().stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[Catalog事件: {}] 通道: {}", event.getType(), deviceIds);
log.info("[Catalog事件: {}]通道数量: {}", event.getType(), event.getChannels().size());
Platform platform = event.getPlatform();
if (platform == null || platform.getServerGBId() == null) {
log.info("[Catalog事件: {}] 缺少通道或通道数据异常: {}", event.getType(), deviceIds);
return;
}
SubscribeInfo subscribe = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
if (subscribe == null) {
log.info("[Catalog事件: {}] 平台未被目录订阅,取消发送: {}", event.getType(), deviceIds);
return;
}
switch (event.getType()) {
@ -178,7 +165,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
channels.addAll(event.getChannels());
}
if (!channels.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds);
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), channels.size());
try {
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
@ -198,7 +185,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
deviceChannelList.addAll(event.getChannels());
}
if (!deviceChannelList.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds);
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceChannelList.size());
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
@ -473,39 +460,34 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public int removeChannelList(Integer platformId, List<CommonGBChannel> channelList) {
Platform platform = platformMapper.query(platformId);
if (platform == null) {
log.info("[移除关联通道] 平台{}未查询到", platformId);
return 0;
}
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[移除关联通道] 上级平台: {} 通道: {}", platform.getServerGBId(), deviceIds);
int result = platformChannelMapper.removeChannelsWithPlatform(platformId, channelList);
if (result <= 0) {
log.info("[移除关联通道] 平台{}未关联通道: {}", platformId, deviceIds);
return 0;
}
// 查询通道相关的分组信息
Set<Region> regionSet = regionMapper.queryByChannelList(channelList);
Set<Region> deleteRegion = deleteEmptyRegion(regionSet, platformId);
if (!deleteRegion.isEmpty()) {
for (Region region : deleteRegion) {
channelList.add(0, CommonGBChannel.build(region));
if (result > 0) {
// 查询通道相关的分组信息
Set<Region> regionSet = regionMapper.queryByChannelList(channelList);
Set<Region> deleteRegion = deleteEmptyRegion(regionSet, platformId);
if (!deleteRegion.isEmpty()) {
for (Region region : deleteRegion) {
channelList.add(0, CommonGBChannel.build(region));
}
}
}
// 查询通道相关的分组信息
Set<Group> groupSet = groupMapper.queryByChannelList(channelList);
Set<Group> deleteGroup = deleteEmptyGroup(groupSet, platformId);
if (!deleteGroup.isEmpty()) {
for (Group group : deleteGroup) {
channelList.add(0, CommonGBChannel.build(group));
// 查询通道相关的分组信息
Set<Group> groupSet = groupMapper.queryByChannelList(channelList);
Set<Group> deleteGroup = deleteEmptyGroup(groupSet, platformId);
if (!deleteGroup.isEmpty()) {
for (Group group : deleteGroup) {
channelList.add(0, CommonGBChannel.build(group));
}
}
// 发送消息
try {
// 发送catalog
eventPublisher.catalogEventPublish(platform, channelList, CatalogEvent.DEL);
} catch (Exception e) {
log.warn("[移除关联通道] 发送失败,数量:{}", channelList.size(), e);
}
}
// 发送消息
try {
// 发送catalog
eventPublisher.catalogEventPublish(platform, channelList, CatalogEvent.DEL);
} catch (Exception e) {
log.warn("[移除关联通道] 发送失败,数量:{}", channelList.size(), e);
}
return result;
}
@ -515,7 +497,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public int removeChannels(Integer platformId, List<Integer> channelIds) {
List<CommonGBChannel> channelList = platformChannelMapper.queryShare(platformId, channelIds);
if (channelList.isEmpty()) {
log.info("[移除通道] 通道列表为空");
return 0;
}
return removeChannelList(platformId, channelList);
@ -526,7 +507,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public void removeChannels(List<Integer> ids) {
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(ids);
if (platformList.isEmpty()) {
log.info("[移除多个通道] 未查询到通道关联的平台");
return;
}
@ -540,7 +520,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public void removeChannel(int channelId) {
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelId(channelId);
if (platformList.isEmpty()) {
log.info("[移除多个通道] 未查询到通道:{} 关联的平台", channelId);
return;
}
for (Platform platform : platformList) {
@ -553,7 +532,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
@Override
public List<CommonGBChannel> queryByPlatform(Platform platform) {
if (platform == null) {
log.info("[查询通道所属平台] 平台参数为NULL");
return null;
}
List<CommonGBChannel> commonGBChannelList = commonGBChannelMapper.queryWithPlatform(platform.getId());
@ -592,7 +570,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
Assert.notNull(platform, "平台不存在");
List<CommonGBChannel> channelList = queryByPlatform(platform);
if (channelList.isEmpty()){
log.info("[推送通道] 平台:{} 未查询到通道信息", platform.getServerGBId());
return;
}
SubscribeInfo subscribeInfo = SubscribeInfo.buildSimulated(platform.getServerGBId(), platform.getServerIp());
@ -631,8 +608,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
// 获取关联这些通道的平台
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[获取关联这些通道的平台] 未查询到通道关联的平台, 通道如下 {}", deviceIds);
return;
}
for (Platform platform : platformList) {
@ -671,8 +646,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
// 获取关联这些通道的平台
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[获取关联这些通道的平台] 未查询到通道关联的平台, 通道如下 {}", deviceIds);
return;
}
for (Platform platform : platformList) {
@ -710,8 +683,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
});
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[获取关联这些通道的平台] 未查询到通道关联的平台, 通道如下 {}", deviceIds);
return;
}
for (Platform platform : platformList) {
@ -743,8 +714,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
});
List<Platform> platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds);
if (platformList.isEmpty()) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining());
log.info("[获取关联这些通道的平台] 未查询到通道关联的平台, 通道如下 {}", deviceIds);
return;
}
for (Platform platform : platformList) {

View File

@ -236,9 +236,8 @@ public class RegionServiceImpl implements IRegionService {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "行政区划不存在");
}
List<Region> allParent = getAllParent(region);
List<Region> regionList = new LinkedList<>(allParent);
regionList.add(region);
return regionList;
allParent.add(region);
return allParent;
}
@ -247,13 +246,15 @@ public class RegionServiceImpl implements IRegionService {
return new ArrayList<>();
}
List<Region> regionList = new LinkedList<>();
Region parent = regionMapper.queryByDeviceId(region.getParentDeviceId());
if (parent == null) {
return new ArrayList<>();
return regionList;
}
regionList.add(parent);
List<Region> allParent = getAllParent(parent);
allParent.add(parent);
return allParent;
regionList.addAll(allParent);
return regionList;
}
@Override

View File

@ -671,13 +671,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public boolean stopSendRtp(MediaServer mediaServer, String app, String stream, String ssrc) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
public boolean stopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaInfo.getType());
if (mediaNodeServerService == null) {
log.info("[stopSendRtp] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
log.info("[stopSendRtp] 失败, mediaServer的类型 {},未找到对应的实现类", mediaInfo.getType());
return false;
}
return mediaNodeServerService.stopSendRtp(mediaServer, app, stream, ssrc);
return mediaNodeServerService.stopSendRtp(mediaInfo, app, stream, ssrc);
}
@Override

View File

@ -86,6 +86,7 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
/**
* 查询最新移动位置
* @param deviceId
*/
@Override
public MobilePosition queryLatestPosition(String deviceId) {

View File

@ -70,7 +70,7 @@ public class RedisGpsMsgListener implements MessageListener {
GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
gpsMsgInfo.setStored(false);
gpsMsgInfo.setTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(gpsMsgInfo.getTime()));
log.debug("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
log.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
// 只是放入redis缓存起来
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
} catch (Exception e) {

View File

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
@ -11,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage;
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -35,10 +35,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class RedisGroupMsgListener implements MessageListener {
@Autowired
@Resource
private IGroupService groupService;
@Autowired
@Resource
private IRedisCatchStorage redisCatchStorage;
@Autowired
@ -141,7 +141,6 @@ public class RedisGroupMsgListener implements MessageListener {
group.setUpdateTime(DateUtil.getNow());
aliasGroupToSave.put(group.getAlias(), group);
}
log.info("[业务分组同步回复-存储分组数据] {}", JSONObject.toJSONString(aliasGroupToSave.values()));
// 存储分组数据
groupService.saveByAlias(aliasGroupToSave.values());
@ -154,6 +153,7 @@ public class RedisGroupMsgListener implements MessageListener {
}
}
/**
@ -173,8 +173,8 @@ public class RedisGroupMsgListener implements MessageListener {
if (isTop) {
codeType = "215";
}
return String.format(deviceTemplate, codeType, RandomStringUtils.insecure().next(6, false, true));
} catch (Exception e) {
return String.format(deviceTemplate, codeType, RandomStringUtils.secureStrong().next(6, false, true));
}catch (Exception e) {
log.error("[REDIS消息-业务分组同步回复] 构建新的分组编号失败", e);
return null;
}

View File

@ -48,6 +48,10 @@ public class RedisPushStreamListMsgListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
String serverId = redisCatchStorage.chooseOneServer(null);
if (!userSetting.getServerId().equals(serverId)) {
return;
}
log.info("[REDIS: 推流设备列表更新] {}", new String(message.getBody()));
taskQueue.offer(message);
}
@ -126,7 +130,7 @@ public class RedisPushStreamListMsgListener implements MessageListener {
if (!streamPushItemForUpdate.isEmpty()) {
log.info("修改{}条", streamPushItemForUpdate.size());
log.info(JSONObject.toJSONString(streamPushItemForUpdate));
streamPushService.batchUpdateForRedisMsg(streamPushItemForUpdate);
streamPushService.batchUpdate(streamPushItemForUpdate);
}
} catch (Exception e) {
log.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(msg.getBody()));

View File

@ -79,19 +79,19 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
if (streamStatusMessage.isSetAllOffline()) {
// 所有设备离线
streamPushService.allOfflineForRedisMsg();
streamPushService.allOffline();
}
if (streamStatusMessage.getOfflineStreams() != null
&& !streamStatusMessage.getOfflineStreams().isEmpty()) {
// 更新部分设备离线
log.info("[REDIS: 推流设备状态变化] 更新部分设备离线: {}个", streamStatusMessage.getOfflineStreams().size());
streamPushService.offlineforRedisMsg(streamStatusMessage.getOfflineStreams());
streamPushService.offline(streamStatusMessage.getOfflineStreams());
}
if (streamStatusMessage.getOnlineStreams() != null &&
!streamStatusMessage.getOnlineStreams().isEmpty()) {
// 更新部分设备上线
log.info("[REDIS: 推流设备状态变化] 更新部分设备上线: {}个", streamStatusMessage.getOnlineStreams().size());
streamPushService.onlineForRedisMsg(streamStatusMessage.getOnlineStreams());
streamPushService.online(streamStatusMessage.getOnlineStreams());
}
} catch (Exception e) {
log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.parseObject(msg.getBody()));
@ -115,7 +115,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, () -> {
log.info("[REDIS消息]未收到redis回复推流设备状态执行推流设备离线");
// 五秒收不到请求就设置通道离线然后通知上级离线
streamPushService.allOfflineForRedisMsg();
streamPushService.allOffline();
}, 5000);
}

View File

@ -524,8 +524,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
if (serverId != null) {
redisTemplate.opsForZSet().remove(key, serverId);
}
// 获取得分最高的也是最后更新时间到redis的wvp这样可以避免读取到离线的wvp同时时间最新也一定程度代表最健康的
Set<Object> range = redisTemplate.opsForZSet().reverseRange(key, 0, 0);
Set<Object> range = redisTemplate.opsForZSet().range(key, 0, 0);
if (range == null || range.isEmpty()) {
return null;
}

View File

@ -289,7 +289,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (!channelListForOnline.isEmpty()) {
gbChannelService.online(channelListForOnline, true);
gbChannelService.online(channelListForOnline);
}
List<CommonGBChannel> channelListForOffline = new ArrayList<>();
List<StreamProxy> streamProxiesForRemove = new ArrayList<>();
@ -302,7 +302,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
}
if (!channelListForOffline.isEmpty()) {
gbChannelService.offline(channelListForOffline, true);
gbChannelService.offline(channelListForOffline);
}
if (!streamProxiesForRemove.isEmpty()) {
streamProxyMapper.deleteByList(streamProxiesForRemove);
@ -338,7 +338,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (!channelListForOffline.isEmpty()) {
// 修改国标关联的国标通道的状态
gbChannelService.offline(channelListForOffline, true);
gbChannelService.offline(channelListForOffline);
}
if (!streamProxiesForSendMessage.isEmpty()) {
for (StreamProxy streamProxy : streamProxiesForSendMessage) {

View File

@ -157,9 +157,4 @@ public interface StreamPushMapper {
"</foreach>" +
"</script>"})
int batchUpdate(List<StreamPush> streamPushItemForUpdate);
@Delete(" DELETE FROM wvp_stream_push st " +
" LEFT join wvp_device_channel wdc on wdc.data_type = 2 and st.id = wdc.data_device_id " +
" where wdc.id is null and st.server_id = #{serverId}")
void deleteWithoutGBId(@Param("serverId") String serverId);
}

View File

@ -52,17 +52,17 @@ public interface IStreamPushService {
/**
* 全部离线
*/
void allOfflineForRedisMsg();
void allOffline();
/**
* 推流离线
*/
void offlineforRedisMsg(List<StreamPushItemFromRedis> offlineStreams);
void offline(List<StreamPushItemFromRedis> offlineStreams);
/**
* 推流上线
*/
void onlineForRedisMsg(List<StreamPushItemFromRedis> onlineStreams);
void online(List<StreamPushItemFromRedis> onlineStreams);
/**
* 增加推流
@ -91,7 +91,7 @@ public interface IStreamPushService {
void updatePushStatus(StreamPush streamPush);
void batchUpdateForRedisMsg(List<StreamPush> streamPushItemForUpdate);
void batchUpdate(List<StreamPush> streamPushItemForUpdate);
int delete(int id);

View File

@ -287,7 +287,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
if (mediaServer != null) {
mediaServerService.closeStreams(mediaServer, streamPush.getApp(), streamPush.getStream());
mediaServerService.stopSendRtp(mediaServer, streamPush.getApp(), streamPush.getStream(), null);
}
streamPush.setPushing(false);
if (userSetting.getUsePushingAsStatus()) {
@ -297,6 +296,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
}
sendRtpServerService.deleteByStream(streamPush.getStream());
mediaServerService.stopSendRtp(mediaServer, streamPush.getApp(), streamPush.getStream(), null);
streamPush.setUpdateTime(DateUtil.getNow());
streamPushMapper.update(streamPush);
return true;
@ -383,12 +383,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServer.getId());
}
}
if (!pushItemMap.isEmpty()) {
for (StreamPush streamPush : pushItemMap.values()) {
// 如果没有国标编号从数据库中删除
delete(streamPush.getId());
}
}
Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
if (!streamAuthorityInfos.isEmpty()) {
@ -408,8 +402,11 @@ public class StreamPushServiceImpl implements IStreamPushService {
stop(streamPushItem);
}
}
// 移除没有GBId的推流
streamPushMapper.deleteWithoutGBId(mediaServer.getId());
// // 移除没有GBId的推流
// streamPushMapper.deleteWithoutGBId(mediaServerId);
// // 其他的流设置未启用
// streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
// streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
// 发送流停止消息
String type = "PUSH";
// 发送redis消息
@ -446,9 +443,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
@Override
public void allOfflineForRedisMsg() {
String serverId = redisCatchStorage.chooseOneServer(null);
boolean permission = userSetting.getServerId().equals(serverId);
public void allOffline() {
List<StreamPush> streamPushList = streamPushMapper.selectAll(null, null, null);
if (streamPushList.isEmpty()) {
return;
@ -460,13 +455,11 @@ public class StreamPushServiceImpl implements IStreamPushService {
commonGBChannelList.add(streamPush.buildCommonGBChannel());
}
}
gbChannelService.offline(commonGBChannelList, permission);
gbChannelService.offline(commonGBChannelList);
}
@Override
public void offlineforRedisMsg(List<StreamPushItemFromRedis> offlineStreams) {
String serverId = redisCatchStorage.chooseOneServer(null);
boolean permission = userSetting.getServerId().equals(serverId);
public void offline(List<StreamPushItemFromRedis> offlineStreams) {
// 更新部分设备离线
List<StreamPush> streamPushList = streamPushMapper.getListInList(offlineStreams);
if (streamPushList.isEmpty()) {
@ -474,17 +467,15 @@ public class StreamPushServiceImpl implements IStreamPushService {
return;
}
List<CommonGBChannel> commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList);
gbChannelService.offline(commonGBChannelList, permission);
gbChannelService.offline(commonGBChannelList);
}
@Override
public void onlineForRedisMsg(List<StreamPushItemFromRedis> onlineStreams) {
public void online(List<StreamPushItemFromRedis> onlineStreams) {
if (onlineStreams.isEmpty()) {
log.info("[设备上线] 推流设备列表为空");
return;
}
String serverId = redisCatchStorage.chooseOneServer(null);
boolean permission = userSetting.getServerId().equals(serverId);
// 更新部分设备上线streamPushService
List<StreamPush> streamPushList = streamPushMapper.getListInList(onlineStreams);
if (streamPushList.isEmpty()) {
@ -494,7 +485,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
return;
}
List<CommonGBChannel> commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList);
gbChannelService.online(commonGBChannelList, permission);
gbChannelService.online(commonGBChannelList);
}
@Override
@ -562,19 +553,15 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
@Transactional
public void batchUpdateForRedisMsg(List<StreamPush> streamPushItemForUpdate) {
String serverId = redisCatchStorage.chooseOneServer(null);
boolean permission = userSetting.getServerId().equals(serverId);
if (permission) {
streamPushMapper.batchUpdate(streamPushItemForUpdate);
}
public void batchUpdate(List<StreamPush> streamPushItemForUpdate) {
streamPushMapper.batchUpdate(streamPushItemForUpdate);
List<CommonGBChannel> commonGBChannels = new ArrayList<>();
for (StreamPush streamPush : streamPushItemForUpdate) {
if (!ObjectUtils.isEmpty(streamPush.getGbDeviceId())) {
commonGBChannels.add(streamPush.buildCommonGBChannel());
}
}
gbChannelService.batchUpdateForStreamPushRedisMsg(commonGBChannels, permission);
gbChannelService.batchUpdate(commonGBChannels);
}
@Override

View File

@ -626,11 +626,5 @@ public class CameraChannelController {
return channelService.queryMeetingChannelList(topGroupAlias);
}
@GetMapping(value = "/test")
@ResponseBody
public SYMember test(String device){
return channelService.getMember(device);
}
}

View File

@ -8,10 +8,9 @@ import lombok.Setter;
public class SYMember {
private String no;
private Long unicodeNo;
private Long blockId;
private String unicodeNo;
private String blockId;
private String unitNo;
private String terminalMemberStatus;
private String channelDeviceId;
}

View File

@ -42,7 +42,6 @@ public class CameraChannelService implements CommandLineRunner {
private final String REDIS_CHANNEL_MESSAGE = "VM_MSG_MOBILE_CHANNEL";
private final String REDIS_MEMBER_STATUS_MESSAGE = "VM_MSG_MEMBER_STATUS_CHANNEL";
private final String MOBILE_CHANNEL_PREFIX = "nationalStandardMobileTerminal_";
private final String DELAY_TASK_KEY = "DELAY_TASK_KEY_";
@Autowired
private CommonGBChannelMapper channelMapper;
@ -127,9 +126,9 @@ public class CameraChannelService implements CommandLineRunner {
List<CommonGBChannel> resultListForOnline = new ArrayList<>();
List<CommonGBChannel> resultListForOffline = new ArrayList<>();
Map<String, CommonGBChannel> delayChannelMap = new HashMap<>();
List<SYMember> memberList = new ArrayList<>();
List<CommonGBChannel> addMemberList = new ArrayList<>();
switch (event.getMessageType()) {
@ -158,29 +157,16 @@ public class CameraChannelService implements CommandLineRunner {
if (oldChannel != null) {
if (oldChannel.getGbPtzType() != null && oldChannel.getGbPtzType() == 99) {
resultListForUpdate.add(channel);
// 如果状态变化发送消息
if (!Objects.equals(oldChannel.getGbStatus(), channel.getGbStatus())) {
SYMember member = getMember(channel.getGbDeviceId());
if (member != null) {
if ("ON".equals(channel.getGbStatus())) {
member.setTerminalMemberStatus("ONLINE");
}else {
member.setTerminalMemberStatus("OFFLINE");
}
memberList.add(member);
}
}
}else {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
delayChannelMap.put(channel.getGbDeviceId(), channel);
addMemberList.add(channel);
}
}
}else {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
delayChannelMap.put(channel.getGbDeviceId(), channel);
addMemberList.add(channel);
}
}
}else {
@ -189,11 +175,6 @@ public class CameraChannelService implements CommandLineRunner {
CameraChannel cameraChannel = new CameraChannel();
cameraChannel.setGbDeviceId(channel.getGbDeviceId());
resultListForDelete.add(cameraChannel);
SYMember member = getMember(cameraChannel.getGbDeviceId());
if (member != null) {
member.setTerminalMemberStatus("OFFLINE");
memberList.add(member);
}
}
}
}
@ -205,14 +186,8 @@ public class CameraChannelService implements CommandLineRunner {
CameraChannel cameraChannel = new CameraChannel();
cameraChannel.setGbDeviceId(channel.getGbDeviceId());
resultListForDelete.add(cameraChannel);
SYMember member = getMember(cameraChannel.getGbDeviceId());
if (member != null) {
member.setTerminalMemberStatus("OFFLINE");
memberList.add(member);
}
}
}
break;
case ON:
case OFF:
@ -246,7 +221,7 @@ public class CameraChannelService implements CommandLineRunner {
if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
delayChannelMap.put(channel.getGbDeviceId(), channel);
addMemberList.add(channel);
}
}
}
@ -257,7 +232,7 @@ public class CameraChannelService implements CommandLineRunner {
jsonObject.put("type", ChannelEvent.ChannelEventMessageType.DEL);
jsonObject.put("list", resultListForDelete);
log.info("[SY-redis发送通知-DEL] 发送 通道信息变化 {}: {}", REDIS_CHANNEL_MESSAGE, jsonObject.toString());
redisTemplateForString.convertAndSend(REDIS_CHANNEL_MESSAGE, jsonObject.toString());
redisTemplate.convertAndSend(REDIS_CHANNEL_MESSAGE, jsonObject);
}
if (!resultListForAdd.isEmpty()) {
sendChannelMessage(resultListForAdd, ChannelEvent.ChannelEventMessageType.ADD);
@ -274,42 +249,34 @@ public class CameraChannelService implements CommandLineRunner {
if (!memberList.isEmpty()) {
sendMemberStatusMessage(memberList);
}
if (!delayChannelMap.isEmpty()) {
if (!addMemberList.isEmpty()) {
// 对于在线的终端进行延迟检查和发送
for (CommonGBChannel commonGBChannel : delayChannelMap.values()) {
String key = DELAY_TASK_KEY + commonGBChannel.getGbDeviceId();
dynamicTask.startDelay(key, () -> {
dynamicTask.stop(key);
String key = UUID.randomUUID().toString();
dynamicTask.startDelay(key, () -> {
List<SYMember> members = new ArrayList<>();
for (CommonGBChannel commonGBChannel : addMemberList) {
SYMember member = getMember(commonGBChannel.getGbDeviceId());
if (member == null) {
return;
continue;
}
member.setTerminalMemberStatus("ONLINE");
sendMemberStatusMessage(List.of(member));
}, 3000);
}
members.add(member);
}
if (!members.isEmpty()) {
sendMemberStatusMessage(members);
}
}, 5000);
}
}
private void sendMemberStatusMessage(List<SYMember> memberList) {
// 取消延时发送
for (SYMember syMember : memberList) {
String key = DELAY_TASK_KEY + syMember.getChannelDeviceId();
if (dynamicTask.contains(key)) {
log.info("[SY-redis发送通知] 取消延时新增任务: {}", key);
dynamicTask.stop(key);
}
}
String jsonString = JSONObject.toJSONString(memberList);
log.info("[SY-redis发送通知] 发送 状态变化 {}: {}", REDIS_MEMBER_STATUS_MESSAGE, jsonString);
redisTemplateForString.convertAndSend(REDIS_MEMBER_STATUS_MESSAGE, jsonString);
redisTemplate.convertAndSend(REDIS_MEMBER_STATUS_MESSAGE, jsonString);
}
private void sendChannelMessage(List<CommonGBChannel> channelList, ChannelEvent.ChannelEventMessageType type) {
if (channelList.isEmpty()) {
log.warn("[SY-redis发送通知-{}] 发送失败,数据为空, 通道信息变化 {}", type, REDIS_CHANNEL_MESSAGE);
return;
}
List<CameraChannel> cameraChannelList = channelMapper.queryCameraChannelByIds(channelList);
@ -317,7 +284,7 @@ public class CameraChannelService implements CommandLineRunner {
jsonObject.put("type", type);
jsonObject.put("list", cameraChannelList);
log.info("[SY-redis发送通知-{}] 发送 通道信息变化 {}: {}", type, REDIS_CHANNEL_MESSAGE, jsonObject.toString());
redisTemplateForString.convertAndSend(REDIS_CHANNEL_MESSAGE, jsonObject.toString());
redisTemplate.convertAndSend(REDIS_CHANNEL_MESSAGE, jsonObject);
}
// 监听GPS消息如果是移动设备则发送redis消息
@ -344,21 +311,18 @@ public class CameraChannelService implements CommandLineRunner {
jsonObject.put("direction", mobilePosition.getDirection());
jsonObject.put("speed", mobilePosition.getSpeed());
jsonObject.put("blockId", member.getBlockId());
jsonObject.put("gbDeviceId", mobilePosition.getChannelDeviceId());
log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString());
redisTemplateForString.convertAndSend(REDIS_GPS_MESSAGE, jsonObject.toString());
redisTemplate.convertAndSend(REDIS_GPS_MESSAGE, jsonObject);
}
public SYMember getMember(String deviceId) {
private SYMember getMember(String deviceId) {
// 从redis补充信息
String key = MOBILE_CHANNEL_PREFIX + deviceId;
JSONObject jsonObject = (JSONObject)redisTemplate.opsForValue().get(key);
if (jsonObject == null) {
String memberJsonString = (String) redisTemplate.opsForValue().get(key);
if (memberJsonString == null) {
return null;
}
SYMember syMember = JSONObject.parseObject(jsonObject.toString(), SYMember.class);
syMember.setChannelDeviceId(deviceId);
return syMember;
return JSONObject.parseObject(memberJsonString, SYMember.class);
}

View File

@ -30,14 +30,11 @@
</el-form-item>
<el-form-item label="行政区域">
<el-input v-model="form.gbCivilCode" placeholder="请输入行政区域" @change="getRegionPaths">
<el-input v-model="form.gbCivilCode" placeholder="请输入行政区域">
<template v-slot:append>
<el-button @click="chooseCivilCode()">选择</el-button>
</template>
</el-input>
<el-breadcrumb v-if="regionPath.length > 0" separator="/" style="display: block; margin-top: 8px; font-size: 14px;">
<el-breadcrumb-item v-for="key in regionPath" :key="key">{{ key }}</el-breadcrumb-item>
</el-breadcrumb>
</el-form-item>
<el-form-item label="安装地址">
@ -277,7 +274,6 @@ export default {
loading: false,
modelList: [],
parentPath: [],
regionPath: [],
form: {}
}
},
@ -391,7 +387,6 @@ export default {
this.form = data
this.$set(this.form, 'enableBroadcastForBool', this.form.enableBroadcast === 1)
this.getPaths()
this.getRegionPaths()
})
.finally(() => {
this.loading = false
@ -405,7 +400,6 @@ export default {
chooseCivilCode: function() {
this.$refs.chooseCivilCode.openDialog(code => {
this.form.gbCivilCode = code
this.getRegionPaths()
})
},
chooseGroup: function() {
@ -437,20 +431,6 @@ export default {
this.parentPath = path
})
}
},
getRegionPaths: function() {
this.regionPath = []
if (this.form.gbCivilCode) {
this.$store.dispatch('region/queryPath', this.form.gbCivilCode)
.then(data => {
console.log(data)
const path = []
for (let i = 0; i < data.length; i++) {
path.push(data[i].name)
}
this.regionPath = path
})
}
}
}
}

View File

@ -208,7 +208,7 @@ export default {
},
mounted() {
this.initData()
this.updateLooper = setInterval(this.getList, 10000)
this.updateLooper = setInterval(this.initData, 10000)
},
destroyed() {
this.$destroy('videojs')
@ -216,8 +216,6 @@ export default {
},
methods: {
initData: function() {
this.currentPage = 1
this.total = 0
this.getList()
},
currentChange: function(val) {

View File

@ -10,7 +10,7 @@
placeholder="关键字"
prefix-icon="el-icon-search"
clearable
@input="queryList"
@input="getPlatformList"
/>
</el-form-item>
<el-form-item>
@ -292,11 +292,6 @@ export default {
this.count = val
this.getPlatformList()
},
queryList: function() {
this.currentPage = 1
this.total = 0
this.getPlatformList()
},
getPlatformList: function() {
this.$store.dispatch('platform/query', {
count: this.count,

View File

@ -9,7 +9,7 @@
placeholder="关键字"
prefix-icon="el-icon-search"
clearable
@input="queryList"
@input="getStreamProxyList"
/>
</el-form-item>
<el-form-item label="流媒体">
@ -18,7 +18,7 @@
style="margin-right: 1rem;"
placeholder="请选择"
default-first-option
@change="queryList"
@change="getStreamProxyList"
>
<el-option label="全部" value="" />
<el-option
@ -35,7 +35,7 @@
style="margin-right: 1rem;"
placeholder="请选择"
default-first-option
@change="queryList"
@change="getStreamProxyList"
>
<el-option label="全部" value="" />
<el-option label="正在拉流" value="true" />
@ -189,11 +189,6 @@ export default {
this.count = val
this.getStreamProxyList()
},
queryList: function() {
this.currentPage = 1
this.total = 0
this.getStreamProxyList()
},
getStreamProxyList: function() {
this.$store.dispatch('streamProxy/queryList', {
page: this.currentPage,

View File

@ -9,7 +9,7 @@
placeholder="关键字"
prefix-icon="el-icon-search"
clearable
@input="queryList"
@input="getPushList"
/>
</el-form-item>
<el-form-item label="流媒体">
@ -18,7 +18,7 @@
style="margin-right: 1rem;"
placeholder="请选择"
default-first-option
@change="queryList"
@change="getPushList"
>
<el-option label="全部" value="" />
<el-option
@ -35,7 +35,7 @@
style="margin-right: 1rem;"
placeholder="请选择"
default-first-option
@change="queryList"
@change="getPushList"
>
<el-option label="全部" value="" />
<el-option label="推流中" value="true" />
@ -206,11 +206,6 @@ export default {
this.count = val
this.getPushList()
},
queryList: function() {
this.currentPage = 1
this.total = 0
this.getPushList()
},
getPushList: function() {
this.$store.dispatch('streamPush/queryList', {
page: this.currentPage,