支持订阅续订消息处理

This commit is contained in:
lin 2025-07-22 10:08:18 +08:00
parent e59b300c2e
commit 82c2cc9bd2
4 changed files with 111 additions and 19 deletions

View File

@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -17,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class SubscribeHolder {
private static final Logger log = LoggerFactory.getLogger(SubscribeHolder.class);
@Autowired
private DynamicTask dynamicTask;
@ -33,10 +36,10 @@ public class SubscribeHolder {
catalogMap.put(platformId, subscribeInfo);
if (subscribeInfo.getExpires() > 0) {
// 添加订阅到期
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
String taskOverdueKey = taskOverduePrefix + "Catalog_" + platformId;
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
subscribeInfo.getExpires() * 1000);
subscribeInfo.getExpires() * 1000 + 1000);
}
}
@ -47,7 +50,7 @@ public class SubscribeHolder {
public void removeCatalogSubscribe(String platformId) {
catalogMap.remove(platformId);
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
String taskOverdueKey = taskOverduePrefix + "Catalog_" + platformId;
Runnable runnable = dynamicTask.get(taskOverdueKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
@ -62,21 +65,21 @@ public class SubscribeHolder {
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId;
// 添加任务处理GPS定时推送
int cycleForCatalog;
int cycle;
if (subscribeInfo.getGpsInterval() <= 0) {
cycleForCatalog = 5;
cycle = 5;
}else {
cycleForCatalog = subscribeInfo.getGpsInterval();
cycle = subscribeInfo.getGpsInterval();
}
dynamicTask.startCron(key, gpsTask,
cycleForCatalog * 1000);
dynamicTask.startCron(key, gpsTask, cycle * 1000);
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
if (subscribeInfo.getExpires() > 0) {
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
log.info("[移动位置订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId());
removeMobilePositionSubscribe(subscribeInfo.getId());
},
subscribeInfo.getExpires() * 1000);
subscribeInfo.getExpires() * 1000 + 1000);
}
}
@ -85,6 +88,7 @@ public class SubscribeHolder {
}
public void removeMobilePositionSubscribe(String platformId) {
mobilePositionMap.remove(platformId);
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId;
// 结束任务处理GPS定时推送
@ -123,4 +127,58 @@ public class SubscribeHolder {
removeMobilePositionSubscribe(platformId);
removeCatalogSubscribe(platformId);
}
public SubscribeInfo getSubscribeByCallId(String callId) {
for (SubscribeInfo subscribeInfo : catalogMap.values()) {
if (subscribeInfo.getRequest() != null && subscribeInfo.getRequest().getCallIdHeader().getCallId().equals(callId)){
return subscribeInfo;
}
}
for (SubscribeInfo subscribeInfo : mobilePositionMap.values()) {
if (subscribeInfo.getRequest() != null && subscribeInfo.getRequest().getCallIdHeader().getCallId().equals(callId)){
return subscribeInfo;
}
}
return null;
}
public void expires(SubscribeInfo subscribeInfo, int expires) {
String taskOverdueKey = taskOverduePrefix + subscribeInfo.getType() + "_" + subscribeInfo.getId();
if (expires > 0) {
subscribeInfo.setExpires(expires);
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
if ("Catalog".equals(subscribeInfo.getType())) {
catalogMap.remove(subscribeInfo.getId());
log.info("[目录订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId());
removeCatalogSubscribe(subscribeInfo.getId());
}else {
log.info("[移动位置订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId());
removeMobilePositionSubscribe(subscribeInfo.getId());
}
},
expires * 1000 + 1000);
if ("Catalog".equals(subscribeInfo.getType())) {
catalogMap.put(subscribeInfo.getId(), subscribeInfo);
}else {
mobilePositionMap.put(subscribeInfo.getId(), subscribeInfo);
}
}else {
Runnable runnable = dynamicTask.get(taskOverdueKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop(null);
}
dynamicTask.stop(taskOverdueKey);
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + subscribeInfo.getType() + "_" + subscribeInfo.getId();
dynamicTask.stop(key);
if ("Catalog".equals(subscribeInfo.getType())) {
catalogMap.remove(subscribeInfo.getId());
}else {
mobilePositionMap.remove(subscribeInfo.getId());
}
}
}
}

View File

@ -27,6 +27,7 @@ public class SubscribeInfo {
private int expires;
private String eventId;
private String eventType;
private String type;
private SIPResponse response;
/**
@ -138,4 +139,12 @@ public class SubscribeInfo {
public void setSimulatedToTag(String simulatedToTag) {
this.simulatedToTag = simulatedToTag;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}

View File

@ -1,8 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CmdType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
@ -24,6 +21,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@ -64,14 +62,39 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
sipProcessorObserver.addRequestProcessor(method, this);
}
/**
* 处理SUBSCRIBE请求
*
/**
* 处理SUBSCRIBE请求
*
* @param evt 事件
*/
@Override
public void process(RequestEvent evt) {
SIPRequest request = (SIPRequest) evt.getRequest();
// 判断是否为续订或者取消订阅请求
String callId = request.getCallIdHeader().getCallId();
if (!StringUtils.hasLength(callId)) {
logger.info("[订阅消息]: callID丢失: \r\n {}", request);
return;
}
SubscribeInfo subscribeInfo = subscribeHolder.getSubscribeByCallId(callId);
if (subscribeInfo != null) {
int expires = request.getExpires().getExpires();
if (expires == 0) {
logger.info("[取消订阅]: callID: {}", callId);
// 移除订阅
}else {
logger.info("[订阅续订]: callID: {}, 有效期: {}", callId, expires);
// 增加存活时间
}
subscribeHolder.expires(subscribeInfo, expires);
try {
// 回复200 OK
responseAck(request, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 订阅消息 应答消息 200: {}", e.getMessage());
}
return;
}
try {
Element rootElement = getRootElement(evt);
if (rootElement == null) {
@ -117,9 +140,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
if (platform == null) {
return;
}
subscribeInfo.setType("MobilePosition");
logger.info("[上级的移动位置订阅请求]: {}/{} callID: {}, 有效期: {}", platformId, deviceId, request.getCallIdHeader().getCallId(), subscribeInfo.getExpires());
String sn = XmlUtil.getText(rootElement, "SN");
logger.info("[回复上级的移动位置订阅请求]: {}", platformId);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")
@ -165,6 +188,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
if (request == null) {
return;
}
String platformId = SipUtils.getUserIdFromFromHeader(request);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
@ -172,9 +196,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
return;
}
SubscribeInfo subscribeInfo = new SubscribeInfo(request, platformId);
logger.info("[上级的目录订阅请求]: {}/{} callID: {}, 有效期: {}", platformId, deviceId, request.getCallIdHeader().getCallId(), subscribeInfo.getExpires());
subscribeInfo.setType("Catalog");
String sn = XmlUtil.getText(rootElement, "SN");
logger.info("[回复上级的目录订阅请求]: {}/{}", platformId, deviceId);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")

View File

@ -300,6 +300,7 @@ public class PlatformServiceImpl implements IPlatformService {
subscribeInfo.setId(parentPlatform.getServerGBId());
subscribeInfo.setExpires(-1);
subscribeInfo.setEventType("Catalog");
subscribeInfo.setType("Catalog");
int random = (int) Math.floor(Math.random() * 10000);
subscribeInfo.setEventId(random + "");
subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());