临时提交

This commit is contained in:
lin 2026-01-15 18:05:31 +08:00
parent 332626150f
commit 5373e6082c

View File

@ -1,111 +1,52 @@
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class DeviceStatusTaskRunner {
private final Map<String, DeviceStatusTask> subscribes = new ConcurrentHashMap<>();
private final DelayQueue<DeviceStatusTask> delayQueue = new DelayQueue<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private UserSetting userSetting;
private final String prefix = "VMP_DEVICE_STATUS";
private final String prefix = "VMP_DEVICE_EXPIRES";
private final String redisKey = String.format("%s_%s", prefix, userSetting.getServerId());
// 状态过期检查
@Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS)
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
@Async
public void expirationCheck(){
while (!delayQueue.isEmpty()) {
DeviceStatusTask take = null;
try {
take = delayQueue.take();
try {
removeTask(take.getDeviceId());
take.expired();
}catch (Exception e) {
log.error("[设备状态到期] 到期处理时出现异常, 设备编号: {} ", take.getDeviceId());
}
} catch (InterruptedException e) {
log.error("[设备状态任务] ", e);
}
}
}
public void addTask(DeviceStatusTask task) {
Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000);
if (duration.getSeconds() < 0) {
return;
}
subscribes.put(task.getDeviceId(), task);
String key = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId());
redisTemplate.opsForValue().set(key, task.getInfo(), duration);
delayQueue.offer(task);
public void addTask(String deviceId, long expireTime) {
redisTemplate.opsForZSet().add(redisKey, deviceId, expireTime);
}
public boolean removeTask(String key) {
DeviceStatusTask task = subscribes.get(key);
if (task == null) {
public void removeTask(String deviceId) {
redisTemplate.opsForZSet().remove(redisKey, deviceId);
}
public boolean containsKey(String deviceId) {
if (ObjectUtils.isEmpty(deviceId)) {
return false;
}
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId());
redisTemplate.delete(redisKey);
subscribes.remove(key);
if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task);
if (!remove) {
log.info("[移除状态任务] 从延时队列内移除失败: {}", key);
}
}
return true;
}
public SipTransactionInfo getTransactionInfo(String key) {
DeviceStatusTask task = subscribes.get(key);
if (task == null) {
return null;
}
return task.getTransactionInfo();
}
public boolean updateDelay(String key, long expirationTime) {
DeviceStatusTask task = subscribes.get(key);
if (task == null) {
return false;
}
log.debug("[更新状态任务时间] 编号: {}", key);
// 如果值更改时间如果队列中有多个元素时 超时无法出发目前采用移除再加入的方法
delayQueue.remove(task);
task.setDelayTime(expirationTime);
delayQueue.offer(task);
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId());
Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000);
redisTemplate.expire(redisKey, duration);
return true;
}
public boolean containsKey(String key) {
return subscribes.containsKey(key);
return redisTemplate.opsForZSet().score(redisKey, deviceId) != null;
}
public List<DeviceStatusTaskInfo> getAllTaskInfo(){