添加报警订阅功能,包括API接口、前端订阅逻辑及相关服务实现

This commit is contained in:
lin 2026-03-30 16:20:51 +08:00
parent 60b1c687ea
commit 2b7cab7572
13 changed files with 300 additions and 49 deletions

View File

@ -430,4 +430,13 @@ public class DeviceQuery {
public void subscribeMobilePosition(int id, int cycle, int interval) {
deviceService.subscribeMobilePosition(id, cycle, interval);
}
@GetMapping("/subscribe/alarm")
@Operation(summary = "开启/关闭报警订阅")
@Parameter(name = "id", description = "通道的Id", required = true)
@Parameter(name = "cycle", description = "订阅周期", required = true)
@Parameter(name = "interval", description = "报送间隔", required = true)
public void subscribeAlarm(int id, int cycle, int interval) {
deviceService.subscribeMobilePosition(id, cycle, interval);
}
}

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import jakarta.validation.constraints.NotNull;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@ -56,6 +57,10 @@ public interface IDeviceService {
*/
boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback);
boolean addAlarmSubscribe(@NotNull Device device, SipTransactionInfo transactionInfo);
boolean removeAlarmSubscribe(Device device, CommonCallback<Boolean> callback);
/**
* 移除移动位置订阅
* @param deviceId 设备ID
@ -194,6 +199,8 @@ public interface IDeviceService {
void deviceStatus(Device device, ErrorCallback<String> callback);
void subscribeAlarm(int id, int cycle);
void updateDeviceHeartInfo(Device device);
void alarm(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime, ErrorCallback<Object> callback);

View File

@ -22,6 +22,7 @@ import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.impl.SubscribeTaskForAlarm;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.impl.SubscribeTaskForCatalog;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.impl.SubscribeTaskForMobilPosition;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@ -223,6 +224,12 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
}else if (SubscribeTaskForAlarm.name.equals(taskInfo.getName())) {
device.setSubscribeCycleForAlarm((int)taskInfo.getExpireTime());
SubscribeTask subscribeTask = SubscribeTaskForAlarm.getInstance(device, this::mobilPositionSubscribeExpire, taskInfo.getTransactionInfo());
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
}
}
}
@ -247,6 +254,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
}
if (subscribeTaskRunner.containsKey(SubscribeTaskForAlarm.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForAlarm.getKey(device));
}
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
@ -355,6 +365,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
addMobilePositionSubscribe(device, null);
}
if (device.getSubscribeCycleForAlarm() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForAlarm.getKey(device))) {
addAlarmSubscribe(device, null);
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -458,6 +471,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
log.debug("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
addMobilePositionSubscribe(device, null);
}
if (device.getSubscribeCycleForAlarm() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForAlarm.getKey(device))) {
log.debug("[订阅丢失] 报警订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
addAlarmSubscribe(device, null);
}
}
}
@ -504,6 +521,18 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
private void alarmSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[报警订阅] 到期, 编号: {}", deviceId);
Device device = getDeviceByDeviceId(deviceId);
if (device == null) {
log.info("[移报警订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
return;
}
if (device.isOnLine() && device.getSubscribeCycleForAlarm() > 0) {
addMobilePositionSubscribe(device, transactionInfo);
}
}
@Override
public boolean addCatalogSubscribe(@NotNull Device device, SipTransactionInfo transactionInfo) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
@ -636,6 +665,71 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
return true;
}
@Override
public boolean addAlarmSubscribe(@NotNull Device device, SipTransactionInfo transactionInfo) {
if (transactionInfo == null) {
log.info("[添加报警订阅] 设备 {}", device.getDeviceId());
}else {
log.info("[报警订阅续期] 设备 {}", device.getDeviceId());
}
try {
sipCommander.alarmSubscribe(device, transactionInfo, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
// 成功
log.info("[报警订阅]成功: {}", device.getDeviceId());
if (!subscribeTaskRunner.containsKey(SubscribeTaskForAlarm.getKey(device))) {
SIPResponse response = (SIPResponse) event.getResponse();
SipTransactionInfo transactionInfoForResponse = new SipTransactionInfo(response);
SubscribeTask subscribeTask = SubscribeTaskForAlarm.getInstance(device, this::alarmSubscribeExpire, transactionInfoForResponse);
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
}else {
subscribeTaskRunner.updateDelay(SubscribeTaskForAlarm.getKey(device), (device.getSubscribeCycleForAlarm() * 1000L - 500L) + System.currentTimeMillis());
}
},eventResult -> {
// 失败
log.warn("[报警订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 报警订阅: {}", e.getMessage());
return false;
}
return true;
}
@Override
public boolean removeAlarmSubscribe(Device device, CommonCallback<Boolean> callback) {
String key = SubscribeTaskForAlarm.getKey(device);
if (subscribeTaskRunner.containsKey(key)) {
log.info("[移除报警订阅]: {}", device.getDeviceId());
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
if (transactionInfo == null) {
log.warn("[移除报警订阅] 未找到事务信息,{}", device.getDeviceId());
}
try {
device.setSubscribeCycleForAlarm(0);
sipCommander.alarmSubscribe(device, transactionInfo, eventResult -> {
// 成功
log.info("[取消报警订阅]成功: {}", device.getDeviceId());
subscribeTaskRunner.removeSubscribe(SubscribeTaskForAlarm.getKey(device));
if (callback != null) {
callback.run(true);
}
},eventResult -> {
// 失败
log.warn("[取消报警订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}catch (Exception e) {
// 失败
log.warn("[取消报警订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage());
}
}
return true;
}
@Override
public SyncStatus getChannelSyncStatus(String deviceId) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
@ -824,6 +918,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
removeMobilePositionSubscribe(device, null);
}
if (subscribeTaskRunner.containsKey(SubscribeTaskForAlarm.getKey(device))) {
removeAlarmSubscribe(device, null);
}
if (deviceStatusTaskRunner.containsKey(deviceId)) {
deviceStatusTaskRunner.removeTask(deviceId);
}
@ -958,6 +1055,37 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
@Override
public void subscribeAlarm(int id, int cycle) {
Device device = deviceMapper.query(id);
Assert.notNull(device, "未找到设备");
Assert.isTrue(device.isOnLine(), "设备已离线");
if (device.getSubscribeCycleForCatalog() == cycle) {
return;
}
if (!userSetting.getServerId().equals(device.getServerId())) {
redisRpcService.subscribeAlarm(id, cycle);
return;
}
// 目录订阅相关的信息
if (device.getSubscribeCycleForAlarm() > 0) {
// 订阅周期不同则先取消
removeAlarmSubscribe(device, result->{
device.setSubscribeCycleForCatalog(cycle);
updateDevice(device);
if (cycle > 0) {
// 开启订阅
addAlarmSubscribe(device, null);
}
});
}else {
// 开启订阅
device.setSubscribeCycleForAlarm(cycle);
updateDevice(device);
addAlarmSubscribe(device, null);
}
}
@Override
public void updateDeviceHeartInfo(Device device) {
Device deviceInDb = deviceMapper.query(device.getId());

View File

@ -0,0 +1,48 @@
package com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.impl;
import com.genersoft.iot.vmp.common.SubscribeCallback;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SubscribeTaskForAlarm extends SubscribeTask {
public static final String name = "alarm";
public static SubscribeTask getInstance(Device device, SubscribeCallback callback, SipTransactionInfo transactionInfo) {
if (device.getSubscribeCycleForCatalog() <= 0) {
return null;
}
SubscribeTaskForAlarm subscribeTaskForCatalog = new SubscribeTaskForAlarm();
subscribeTaskForCatalog.setDelayTime((device.getSubscribeCycleForCatalog() * 1000L - 500L) + System.currentTimeMillis());
subscribeTaskForCatalog.setDeviceId(device.getDeviceId());
subscribeTaskForCatalog.setCallback(callback);
subscribeTaskForCatalog.setTransactionInfo(transactionInfo);
return subscribeTaskForCatalog;
}
@Override
public void expired() {
if (super.getCallback() == null) {
log.info("[设备订阅到期] 报警订阅 未找到到期处理回调, 编号: {}", getDeviceId());
return;
}
getCallback().run(getDeviceId(), getTransactionInfo());
}
@Override
public String getKey() {
return String.format("%s_%s", name, getDeviceId());
}
@Override
public String getName() {
return name;
}
public static String getKey(Device device) {
return String.format("%s_%s", SubscribeTaskForAlarm.name, device.getDeviceId());
}
}

View File

@ -276,15 +276,8 @@ public interface ISIPCommander {
/**
* 订阅取消订阅报警信息
* @param device 视频设备
* @param expires 订阅过期时间0 = 取消订阅
* @param startPriority 报警起始级别可选
* @param endPriority 报警终止级别可选
* @param startTime 报警发生起始时间可选
* @param endTime 报警发生终止时间可选
* @return true = 命令发送成功
*/
void alarmSubscribe(Device device, int expires, String startPriority, String endPriority, String alarmMethod, String startTime, String endTime) throws InvalidArgumentException, SipException, ParseException;
SIPRequest alarmSubscribe(Device device, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/**
* 订阅取消订阅目录信息

View File

@ -41,6 +41,7 @@ import javax.sip.SipFactory;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.util.List;
/**
@ -1219,17 +1220,9 @@ public class SIPCommander implements ISIPCommander {
/**
* 订阅取消订阅报警信息
*
* @param device 视频设备
* @param expires 订阅过期时间0 = 取消订阅
* @param startPriority 报警起始级别可选
* @param endPriority 报警终止级别可选
* @param alarmMethod 报警方式条件可选
* @param startTime 报警发生起始时间可选
* @param endTime 报警发生终止时间可选
* @return true = 命令发送成功
*/
@Override
public void alarmSubscribe(Device device, int expires, String startPriority, String endPriority, String alarmMethod, String startTime, String endTime) throws InvalidArgumentException, SipException, ParseException {
public SIPRequest alarmSubscribe(Device device, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
StringBuffer cmdXml = new StringBuffer(200);
String charset = device.getCharset();
@ -1238,28 +1231,39 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("<CmdType>Alarm</CmdType>\r\n");
cmdXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
if (!ObjectUtils.isEmpty(startPriority)) {
cmdXml.append("<StartAlarmPriority>" + startPriority + "</StartAlarmPriority>\r\n");
}
if (!ObjectUtils.isEmpty(endPriority)) {
cmdXml.append("<EndAlarmPriority>" + endPriority + "</EndAlarmPriority>\r\n");
}
if (!ObjectUtils.isEmpty(alarmMethod)) {
cmdXml.append("<AlarmMethod>" + alarmMethod + "</AlarmMethod>\r\n");
}
if (!ObjectUtils.isEmpty(startTime)) {
cmdXml.append("<StartAlarmTime>" + startTime + "</StartAlarmTime>\r\n");
}
if (!ObjectUtils.isEmpty(endTime)) {
cmdXml.append("<EndAlarmTime>" + endTime + "</EndAlarmTime>\r\n");
}
cmdXml.append("<StartAlarmPriority>1</StartAlarmPriority>\r\n");
cmdXml.append("<EndAlarmPriority>4/EndAlarmPriority>\r\n");
cmdXml.append("<AlarmMethod>0</AlarmMethod>\r\n");
LocalDateTime nowDateTime = LocalDateTime.now();
String startTime = DateUtil.formatterISO8601.format(nowDateTime);
// 退后一个月作为结束时间
String endTime = DateUtil.formatterISO8601.format(nowDateTime.plusMonths(1));
cmdXml.append("<StartAlarmTime>" + startTime + "</StartAlarmTime>\r\n");
cmdXml.append("<EndAlarmTime>" + endTime + "</EndAlarmTime>\r\n");
cmdXml.append("</Query>\r\n");
CallIdHeader callIdHeader;
if (sipTransactionInfo != null) {
callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sipTransactionInfo.getCallId());
} else {
callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport());
}
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), null, expires, "presence",sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request);
int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog();
if (subscribeCycleForCatalog > 0) {
// 目录订阅有效期不小于 30
subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30);
}
// 有效时间默认为60秒以上
SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, cmdXml.toString(), sipTransactionInfo, subscribeCycleForCatalog, "presence",
callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent);
return request;
}
@Override

View File

@ -70,7 +70,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
Element rootElement = getRootElement(evt);
if (rootElement == null) {
log.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest());
responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
return;
}
String cmd = XmlUtil.getText(rootElement, "CmdType");

View File

@ -80,4 +80,6 @@ public interface IRedisRpcService {
WVPResult<Object> deviceInfo(String serverId, Device device);
WVPResult<List<Preset>> queryPreset(String serverId, Device device, String channelId);
void subscribeAlarm(int id, int cycle);
}

View File

@ -1,33 +1,21 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPTZService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sip.message.Response;
@Component
@Slf4j
@RedisRpcController("device")
@ -96,4 +84,25 @@ public class RedisRpcGbDeviceController extends RpcController {
return response;
}
/**
* 报警订阅
*/
@RedisRpcMapping("subscribeAlarm")
public RedisRpcResponse subscribeAlarm(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int id = paramJson.getIntValue("id");
int cycle = paramJson.getIntValue("cycle");
RedisRpcResponse response = request.getResponse();
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
deviceService.subscribeAlarm(id, cycle);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
}

View File

@ -228,6 +228,15 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
}
@Override
public void subscribeAlarm(int id, int cycle) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", id);
jsonObject.put("cycle", cycle);
RedisRpcRequest request = buildRequest("device/subscribeAlarm", jsonObject);
redisRpcConfig.request(request, 10, TimeUnit.MILLISECONDS);
}
@Override
public boolean updatePlatform(String serverId, Platform platform) {
RedisRpcRequest request = buildRequest("platform/update", platform);

View File

@ -93,6 +93,18 @@ export function subscribeMobilePosition(params) {
}
})
}
export function subscribeForAlarm(params) {
const { id, cycle, interval } = params
return request({
method: 'get',
url: `/api/device/query/subscribe/alarm`,
params: {
id: id,
cycle: cycle,
interval: interval
}
})
}
export function queryBasicParam(deviceId) {
return request({

View File

@ -14,7 +14,7 @@ import {
queryHasStreamChannels,
resetGuard,
setGuard,
subscribeCatalog,
subscribeCatalog, subscribeForAlarm,
subscribeMobilePosition,
sync,
update,
@ -103,6 +103,16 @@ const actions = {
})
})
},
subscribeForAlarm({ commit }, params) {
return new Promise((resolve, reject) => {
subscribeForAlarm(params).then(response => {
const { data } = response
resolve(data)
}).catch(error => {
reject(error)
})
})
},
queryBasicParam({ commit }, deviceId) {
return new Promise((resolve, reject) => {
queryBasicParam(deviceId).then(response => {

View File

@ -103,7 +103,11 @@
:checked="scope.row.subscribeCycleForMobilePosition > 0"
@change="(e)=>subscribeForMobilePosition(scope.row.id, e)"
/>
<!-- <el-checkbox label="报警" disabled :checked="scope.row.subscribeCycleForAlarm > 0"></el-checkbox>-->
<el-checkbox
label="报警"
:checked="scope.row.subscribeCycleForAlarm > 0"
@change="(e)=>subscribeForAlarm(scope.row.id, e)"
/>
</template>
</el-table-column>
<el-table-column prop="keepaliveTime" label="最近心跳" min-width="140" />
@ -429,6 +433,23 @@ export default {
})
})
},
subscribeForAlarm: function(data, value) {
this.$store.dispatch('device/subscribeForAlarm', {
id: data,
cycle: value ? 60 : 0,
interval: value ? 5 : 0
}).then((data) => {
this.$message.success({
showClose: true,
message: value ? '订阅成功' : '取消订阅成功'
})
}).catch((error) => {
this.$message.error({
showClose: true,
message: error.message
})
})
},
syncBasicParam: function(data) {
this.$store.dispatch('device/queryBasicParam')
.then((data) => {