diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java index f24b73986..a264cf747 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java @@ -591,6 +591,9 @@ public interface CommonGBChannelMapper { @SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceId") List queryOnlineListsByGbDeviceId(@Param("deviceId") int deviceId); + @SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceIds") + List queryOnlineListsByGbDeviceIds(List deviceList); + @SelectProvider(type = ChannelProvider.class, method = "queryCommonChannelByDeviceChannel") CommonGBChannel queryCommonChannelByDeviceChannel(@Param("dataType") Integer dataType, @Param("dataDeviceId") Integer dataDeviceId, @Param("deviceId") String deviceId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index 77e9e5ecb..b41cf183b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -610,5 +610,11 @@ public interface DeviceChannelMapper { @Update(value = {"UPDATE wvp_device_channel SET status = 'OFF' WHERE data_type = 1 and data_device_id=#{deviceId}"}) void offlineByDeviceId(@Param("deviceId") int deviceId); + @Update(value = {""}) + void offlineByDeviceIds(List deviceList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java index dd9ddaff2..8158f9e56 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.dao.provider; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Group; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.web.custom.bean.CameraGroup; @@ -601,6 +602,28 @@ public class ChannelProvider { return sqlBuild.toString(); } + public String queryOnlineListsByGbDeviceIds(Map params ){ + StringBuilder sqlBuild = new StringBuilder(); + sqlBuild.append(BASE_SQL_TABLE_NAME); + sqlBuild.append(" where wdc.channel_type = 0 AND coalesce(wdc.gb_status, wdc.status) = 'ON' AND wdc.data_type = 1 "); + + List deviceList = (List)params.get("deviceList"); + if (deviceList != null && !deviceList.isEmpty()) { + sqlBuild.append(" AND data_device_id in ("); + boolean first = true; + for (Device device : deviceList) { + if (!first) { + sqlBuild.append(","); + } + sqlBuild.append("'" + device.getId() + "'"); + first = false; + } + sqlBuild.append(" )"); + } + + return sqlBuild.toString(); + } + public String queryListForSy(Map params ){ StringBuilder sqlBuild = new StringBuilder(); sqlBuild.append(BASE_SQL_FOR_CAMERA_DEVICE); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index cdcee7dd1..b54a403b4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -118,9 +118,9 @@ public class EventPublisher { } - public void deviceOfflineEventPublish(String deviceId) { + public void deviceOfflineEventPublish(Set deviceIds) { DeviceOfflineEvent event = new DeviceOfflineEvent(this); - event.setDeviceId(deviceId); + event.setDeviceIds(deviceIds); applicationEventPublisher.publishEvent(event); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java index fcceafef5..e15aa43f3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java @@ -6,12 +6,13 @@ import lombok.Setter; import org.springframework.context.ApplicationEvent; import java.io.Serial; +import java.util.Set; @Getter @Setter public class DeviceOfflineEvent extends ApplicationEvent { - private String deviceId; + private Set deviceIds; @Serial private static final long serialVersionUID = 1L; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index b20b95ddd..7e8aec7e5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -278,15 +278,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Async @EventListener public void onApplicationEvent(DeviceOfflineEvent event) { - log.info("[设备状态] 到期, 编号: {}", event.getDeviceId()); - Device device = getDeviceByDeviceId(event.getDeviceId()); - Boolean deviceStatus = getDeviceStatus(device); - if (deviceStatus != null && deviceStatus) { - log.info("[设备离线] 主动探测发现设备在线,暂不处理 device:{}", event.getDeviceId()); - online(device); - return; - } - offline(device); + log.info("[设备状态] 到期, 编号: {}", event.getDeviceIds().toString()); + List deviceList = redisCatchStorage.getDeviceList(event.getDeviceIds()); + offline(deviceList); } @Override @@ -392,17 +386,44 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false); } if (isDevice(deviceId)) { - channelOfflineByDevice(device); + channelOfflineByDevice(List.of(device)); } } - private void channelOfflineByDevice(Device device) { + public void offline(List deviceList) { + if (deviceList == null || deviceList.isEmpty()) { + log.warn("[设备不存在]"); + return; + } + List realDeviceList = new ArrayList<>(); + for (Device device : deviceList) { + log.info("[设备离线] device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", device.getDeviceId(), + device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime()); + device.setOnLine(false); + cleanOfflineDevice(device); + if (isDevice(device.getDeviceId())) { + realDeviceList.add(device); + } + redisCatchStorage.updateDevice(device); + if (userSetting.getDeviceStatusNotify()) { + // 发送 redis 消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false); + } + } + deviceMapper.offlineByList(deviceList); + + if (!realDeviceList.isEmpty()) { + channelOfflineByDevice(realDeviceList); + } + } + + private void channelOfflineByDevice(List deviceList) { // 进行通道离线 - List channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceId(device.getId()); + List channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceIds(deviceList); if (channelList.isEmpty()) { return; } - deviceChannelMapper.offlineByDeviceId(device.getId()); + deviceChannelMapper.offlineByDeviceIds(deviceList); // 发送通道离线通知 eventPublisher.channelEventPublish(channelList, ChannelEvent.ChannelEventMessageType.OFF); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java index 1434ec289..291bcd534 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java @@ -48,17 +48,15 @@ public class DeviceStatusManager { if (expiredIds != null && !expiredIds.isEmpty()) { redisTemplate.opsForZSet().remove(redisKey(), expiredIds.toArray()); - // 使用 JDK 21 虚拟线程异步分发事件 - for (String deviceId : expiredIds) { - Thread.startVirtualThread(() -> { - // 获取详情后删除缓存 + Thread.startVirtualThread(() -> { + // 获取详情后删除缓存 // Device device = redisCatchStorage.getDevice(deviceId); // redisCatchStorage.removeDevice(deviceId); - // 发送 Spring 异步事件 - eventPublisher.deviceOfflineEventPublish(deviceId); - }); - } + // 发送 Spring 异步事件 + eventPublisher.deviceOfflineEventPublish(expiredIds); + }); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index e7bdffa32..e26fdce39 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import java.util.List; import java.util.Map; +import java.util.Set; public interface IRedisCatchStorage { @@ -79,6 +80,11 @@ public interface IRedisCatchStorage { */ Device getDevice(String deviceId); + /** + * 获取Device + */ + List getDeviceList(Set deviceIds); + void resetAllCSEQ(); void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 40c741acf..a967af64a 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -184,6 +184,17 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return device; } + @Override + public List getDeviceList(Set deviceIds) { + String key = VideoManagerConstants.DEVICE_PREFIX; + List deviceList = new ArrayList<>(); + List objectList = redisTemplate.opsForHash().multiGet(key, Arrays.asList(deviceIds.toArray())); + for (Object object : objectList) { + deviceList.add((Device)object); + } + return deviceList; + } + @Override public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) { String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId();