临时提交

This commit is contained in:
648540858 2025-05-27 23:19:10 +08:00
parent 3ad5d43368
commit a40446e42b
7 changed files with 76 additions and 21 deletions

View File

@ -15,6 +15,9 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner;
@ -50,6 +53,7 @@ import javax.sip.SipException;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.text.ParseException; import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
@ -111,13 +115,45 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired @Autowired
private SubscribeTaskRunner subscribeTaskRunner; private SubscribeTaskRunner subscribeTaskRunner;
@Autowired
private DeviceStatusTaskRunner deviceStatusTaskRunner;
private Device getDeviceByDeviceIdFromDb(String deviceId) { private Device getDeviceByDeviceIdFromDb(String deviceId) {
return deviceMapper.getDeviceByDeviceId(deviceId); return deviceMapper.getDeviceByDeviceId(deviceId);
} }
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
// TODO 处理设备离线 List<DeviceStatusTaskInfo> allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo();
List<String> onlineDeviceIds = new ArrayList<>();
if (!allTaskInfo.isEmpty()) {
for (DeviceStatusTaskInfo taskInfo : allTaskInfo) {
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
if (device == null) {
deviceStatusTaskRunner.removeTask(taskInfo.getDeviceId());
continue;
}
// 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接
DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(),
taskInfo.getTransactionInfo(), taskInfo.getExpireTime(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(deviceStatusTask);
onlineDeviceIds.add(taskInfo.getDeviceId());
}
// 除了记录的设备以外 其他设备全部离线
List<Device> onlineDevice = getAllOnlineDevice(userSetting.getServerId());
if (!onlineDevice.isEmpty()) {
for (Device device : onlineDevice) {
if (!onlineDeviceIds.contains(device.getDeviceId())) {
// 此设备需要离线
// 清理订阅
// 更新数据库
}
}
}
}
// 处理订阅任务 // 处理订阅任务
List<SubscribeTaskInfo> taskInfoList = subscribeTaskRunner.getAllTaskInfo(); List<SubscribeTaskInfo> taskInfoList = subscribeTaskRunner.getAllTaskInfo();
@ -127,7 +163,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
continue; continue;
} }
Device device = getDeviceByDeviceId(taskInfo.getDeviceId()); Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
if (device == null || !device.isOnLine()) { if (device == null || !device.isOnLine() || !onlineDeviceIds.contains(taskInfo.getDeviceId())) {
subscribeTaskRunner.removeSubscribe(taskInfo.getKey()); subscribeTaskRunner.removeSubscribe(taskInfo.getKey());
continue; continue;
} }
@ -148,6 +184,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
} }
} }
private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[设备状态] 到期, 编号: {}", deviceId);
offline(deviceId, "");
}
@Override @Override
public void online(Device device, SipTransactionInfo sipTransactionInfo) { public void online(Device device, SipTransactionInfo sipTransactionInfo) {
log.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); log.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());

View File

@ -654,11 +654,11 @@ public class PlayServiceImpl implements IPlayService {
// 主动连接失败结束流程 清理数据 // 主动连接失败结束流程 清理数据
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), callback.run(InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null);
inviteStreamService.call(InviteSessionType.BROADCAST, channel.getId(), null, inviteStreamService.call(InviteSessionType.BROADCAST, channel.getId(), null,
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null);
} }
} catch (SdpException e) { } catch (SdpException e) {
log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e); log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e);

View File

@ -3,18 +3,18 @@ package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.common.DeviceStatusCallback; import com.genersoft.iot.vmp.common.DeviceStatusCallback;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Slf4j
@Data @Data
public abstract class DeviceStatusTask implements Delayed { public class DeviceStatusTask implements Delayed {
private String deviceId; private String deviceId;
private DeviceStatusCallback callback;
private SipTransactionInfo transactionInfo; private SipTransactionInfo transactionInfo;
/** /**
@ -22,7 +22,24 @@ public abstract class DeviceStatusTask implements Delayed {
*/ */
private long delayTime; private long delayTime;
public abstract void expired(); private DeviceStatusCallback callback;
public static DeviceStatusTask getInstance(String deviceId, SipTransactionInfo transactionInfo, long delayTime, DeviceStatusCallback callback) {
DeviceStatusTask deviceStatusTask = new DeviceStatusTask();
deviceStatusTask.setDeviceId(deviceId);
deviceStatusTask.setTransactionInfo(transactionInfo);
deviceStatusTask.setDelayTime((delayTime * 1000L - 500L) + System.currentTimeMillis());
deviceStatusTask.setCallback(callback);
return deviceStatusTask;
}
public void expired() {
if (callback == null) {
log.info("[设备离线] 未找到过期处理回调, {}", deviceId);
return;
}
callback.run(deviceId, transactionInfo);
}
@Override @Override
public long getDelay(@NotNull TimeUnit unit) { public long getDelay(@NotNull TimeUnit unit) {

View File

@ -2,8 +2,6 @@ package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -43,7 +41,7 @@ public class DeviceStatusTaskRunner {
try { try {
take = delayQueue.take(); take = delayQueue.take();
try { try {
removeSubscribe(take.getDeviceId()); removeTask(take.getDeviceId());
take.expired(); take.expired();
}catch (Exception e) { }catch (Exception e) {
log.error("[设备状态到期] 到期处理时出现异常, 设备编号: {} ", take.getDeviceId()); log.error("[设备状态到期] 到期处理时出现异常, 设备编号: {} ", take.getDeviceId());
@ -54,7 +52,7 @@ public class DeviceStatusTaskRunner {
} }
} }
public void addSubscribe(DeviceStatusTask task) { public void addTask(DeviceStatusTask task) {
Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000); Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000);
if (duration.getSeconds() < 0) { if (duration.getSeconds() < 0) {
return; return;
@ -65,7 +63,7 @@ public class DeviceStatusTaskRunner {
delayQueue.offer(task); delayQueue.offer(task);
} }
public boolean removeSubscribe(String key) { public boolean removeTask(String key) {
DeviceStatusTask task = subscribes.get(key); DeviceStatusTask task = subscribes.get(key);
if (task == null) { if (task == null) {
return false; return false;
@ -95,7 +93,7 @@ public class DeviceStatusTaskRunner {
if (task == null) { if (task == null) {
return false; return false;
} }
log.info("[更新状态任务时间] {}, 编号: {}", task.getName(), key); log.info("[更新状态任务时间] 编号: {}", key);
if (delayQueue.contains(task)) { if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task); boolean remove = delayQueue.remove(task);
if (!remove) { if (!remove) {

View File

@ -114,8 +114,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
} else { } else {
if (userSetting.getGbDeviceOnline() == 1) { if (userSetting.getGbDeviceOnline() == 1) {
// 对于已经离线的设备判断他的注册是否已经过期 // 对于已经离线的设备判断他的注册是否已经过期
device.setOnLine(true);
device.setRegisterTime(DateUtil.getNow());
deviceService.online(device, null); deviceService.online(device, null);
} }
} }

View File

@ -17,7 +17,8 @@ public enum InviteErrorCode {
ERROR_FOR_RESET_SSRC(-9, "重新设置收流信息失败"), ERROR_FOR_RESET_SSRC(-9, "重新设置收流信息失败"),
ERROR_FOR_SIP_SENDING_FAILED(-10, "命令发送失败"), ERROR_FOR_SIP_SENDING_FAILED(-10, "命令发送失败"),
ERROR_FOR_ASSIST_NOT_READY(-11, "没有可用的assist服务"), ERROR_FOR_ASSIST_NOT_READY(-11, "没有可用的assist服务"),
ERROR_FOR_PARAMETER_ERROR(-13, "参数异常"); ERROR_FOR_PARAMETER_ERROR(-13, "参数异常"),
ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR(-14, "TCP主动连接失败");
private final int code; private final int code;
private final String msg; private final String msg;

View File

@ -1,9 +1,9 @@
// sidebar // sidebar
$menuText:#bfcbd9; $menuText:#bfcbd9;
$menuActiveText:#409EFF; $menuActiveText:#409EFF;
$subMenuActiveText:#f4f4f5; //https://github.com/ElemeFE/element/issues/12951 $subMenuActiveText: #f7ff17; //https://github.com/ElemeFE/element/issues/12951
$menuBg:#304156; $menuBg: #1f61cd;
$menuHover:#263445; $menuHover:#263445;
$subMenuBg:#1f2d3d; $subMenuBg:#1f2d3d;