Compare commits

...

4 Commits

Author SHA1 Message Date
田朝盛
a2ac5a715b
Pre Merge pull request !39 from 田朝盛/N/A 2025-11-14 09:12:44 +00:00
lin
30aa3192d0 去除多余依赖 2025-11-14 17:12:16 +08:00
lin
fea157436b 优化redis 消息处理 2025-11-14 17:09:55 +08:00
田朝盛
8dd624ccd4
update src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java.
Signed-off-by: 田朝盛 <1753182616@qq.com>
2025-07-14 08:04:09 +00:00
6 changed files with 29 additions and 9 deletions

View File

@ -8,9 +8,6 @@ public class ServerInfo {
private String ip;
private int port;
/**
* 现在使用的线程数
*/
private String createTime;
public static ServerInfo create(String ip, int port) {

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage;
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import jakarta.annotation.Resource;
@ -37,7 +38,7 @@ public class RedisGroupMsgListener implements MessageListener {
private IGroupService groupService;
@Resource
private IStreamPushService streamPushService;
private IRedisCatchStorage redisCatchStorage;
@Autowired
private UserSetting userSetting;
@ -49,6 +50,10 @@ public class RedisGroupMsgListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
String serverId = redisCatchStorage.chooseOneServer(null);
if (!userSetting.getServerId().equals(serverId)) {
return;
}
log.info("[REDIS: 业务分组同步回复] key {} {}", VideoManagerConstants.VM_MSG_GROUP_LIST_RESPONSE, new String(message.getBody()));
taskQueue.offer(message);
}
@ -77,7 +82,7 @@ public class RedisGroupMsgListener implements MessageListener {
List<RedisGroupMessage> groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class);
for (int i = 0; i < groupMessages.size(); i++) {
RedisGroupMessage groupMessage = groupMessages.get(i);
log.info("[REDIS消息-业务分组同步回复] {}", groupMessage.toString());
log.info("[REDIS消息-业务分组同步回复] 处理数据: {}", groupMessage.toString());
if (!userSetting.isUseAliasForGroupSync()) {
if (groupMessage.getGroupGbId() == null) {
log.warn("[REDIS消息-业务分组同步回复] 分组编号未设置,{}", groupMessage.toString());
@ -156,11 +161,12 @@ public class RedisGroupMsgListener implements MessageListener {
group.setUpdateTime(DateUtil.getNow());
if (group.getId() > 0) {
log.info("[REDIS消息-业务分组同步回复] 更新入库, {}", JSON.toJSONString(group));
groupService.update(group);
}else {
log.info("[REDIS消息-业务分组同步回复] 新增入库, {}", JSON.toJSONString(group));
groupService.add(group);
}
}
}

View File

@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamPush.bean.RedisPushStreamMessage;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
@ -36,10 +38,20 @@ public class RedisPushStreamListMsgListener implements MessageListener {
@Resource
private IStreamPushService streamPushService;
@Resource
private IRedisCatchStorage redisCatchStorage;
@Resource
private UserSetting userSetting;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Override
public void onMessage(Message message, byte[] bytes) {
String serverId = redisCatchStorage.chooseOneServer(null);
if (!userSetting.getServerId().equals(serverId)) {
return;
}
log.info("[REDIS: 推流设备列表更新] {}", new String(message.getBody()));
taskQueue.offer(message);
}

View File

@ -521,7 +521,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override
public String chooseOneServer(String serverId) {
String key = VideoManagerConstants.WVP_SERVER_LIST;
redisTemplate.opsForZSet().remove(key, serverId);
if (serverId != null) {
redisTemplate.opsForZSet().remove(key, serverId);
}
Set<Object> range = redisTemplate.opsForZSet().range(key, 0, 0);
if (range == null || range.isEmpty()) {
return null;

View File

@ -182,10 +182,13 @@ public class CameraChannelService implements CommandLineRunner {
case VLOST:
for (CommonGBChannel channel : channels) {
if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) {
CameraChannel cameraChannel = channelMapper.queryCameraChannelById(channel.getGbId());
if (event.getMessageType() == ChannelEvent.ChannelEventMessageType.ON) {
resultListForOnline.add(channel);
cameraChannel.setGbStatus("ON");
resultListForOnline.add(cameraChannel);
}else {
resultListForOffline.add(channel);
cameraChannel.setGbStatus("OFF");
resultListForOffline.add(cameraChannel);
}
}