diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index a15de224a..21cdd837a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -17,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class SubscribeHolder { + private static final Logger log = LoggerFactory.getLogger(SubscribeHolder.class); @Autowired private DynamicTask dynamicTask; @@ -33,10 +36,10 @@ public class SubscribeHolder { catalogMap.put(platformId, subscribeInfo); if (subscribeInfo.getExpires() > 0) { // 添加订阅到期 - String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; + String taskOverdueKey = taskOverduePrefix + "Catalog_" + platformId; // 添加任务处理订阅过期 dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), - subscribeInfo.getExpires() * 1000); + subscribeInfo.getExpires() * 1000 + 1000); } } @@ -47,7 +50,7 @@ public class SubscribeHolder { public void removeCatalogSubscribe(String platformId) { catalogMap.remove(platformId); - String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; + String taskOverdueKey = taskOverduePrefix + "Catalog_" + platformId; Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; @@ -62,21 +65,21 @@ public class SubscribeHolder { String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId; // 添加任务处理GPS定时推送 - int cycleForCatalog; + int cycle; if (subscribeInfo.getGpsInterval() <= 0) { - cycleForCatalog = 5; + cycle = 5; }else { - cycleForCatalog = subscribeInfo.getGpsInterval(); + cycle = subscribeInfo.getGpsInterval(); } - dynamicTask.startCron(key, gpsTask, - cycleForCatalog * 1000); + dynamicTask.startCron(key, gpsTask, cycle * 1000); String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; if (subscribeInfo.getExpires() > 0) { // 添加任务处理订阅过期 dynamicTask.startDelay(taskOverdueKey, () -> { + log.info("[移动位置订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId()); removeMobilePositionSubscribe(subscribeInfo.getId()); }, - subscribeInfo.getExpires() * 1000); + subscribeInfo.getExpires() * 1000 + 1000); } } @@ -85,6 +88,7 @@ public class SubscribeHolder { } public void removeMobilePositionSubscribe(String platformId) { + mobilePositionMap.remove(platformId); String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId; // 结束任务处理GPS定时推送 @@ -123,4 +127,58 @@ public class SubscribeHolder { removeMobilePositionSubscribe(platformId); removeCatalogSubscribe(platformId); } + + public SubscribeInfo getSubscribeByCallId(String callId) { + for (SubscribeInfo subscribeInfo : catalogMap.values()) { + if (subscribeInfo.getRequest() != null && subscribeInfo.getRequest().getCallIdHeader().getCallId().equals(callId)){ + return subscribeInfo; + } + } + for (SubscribeInfo subscribeInfo : mobilePositionMap.values()) { + if (subscribeInfo.getRequest() != null && subscribeInfo.getRequest().getCallIdHeader().getCallId().equals(callId)){ + return subscribeInfo; + } + } + return null; + } + + public void expires(SubscribeInfo subscribeInfo, int expires) { + + String taskOverdueKey = taskOverduePrefix + subscribeInfo.getType() + "_" + subscribeInfo.getId(); + if (expires > 0) { + subscribeInfo.setExpires(expires); + // 添加任务处理订阅过期 + dynamicTask.startDelay(taskOverdueKey, () -> { + if ("Catalog".equals(subscribeInfo.getType())) { + catalogMap.remove(subscribeInfo.getId()); + log.info("[目录订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId()); + removeCatalogSubscribe(subscribeInfo.getId()); + }else { + log.info("[移动位置订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId()); + removeMobilePositionSubscribe(subscribeInfo.getId()); + } + }, + expires * 1000 + 1000); + if ("Catalog".equals(subscribeInfo.getType())) { + catalogMap.put(subscribeInfo.getId(), subscribeInfo); + }else { + mobilePositionMap.put(subscribeInfo.getId(), subscribeInfo); + } + }else { + Runnable runnable = dynamicTask.get(taskOverdueKey); + if (runnable instanceof ISubscribeTask) { + ISubscribeTask subscribeTask = (ISubscribeTask) runnable; + subscribeTask.stop(null); + } + dynamicTask.stop(taskOverdueKey); + + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + subscribeInfo.getType() + "_" + subscribeInfo.getId(); + dynamicTask.stop(key); + if ("Catalog".equals(subscribeInfo.getType())) { + catalogMap.remove(subscribeInfo.getId()); + }else { + mobilePositionMap.remove(subscribeInfo.getId()); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index e5c504559..17e792d92 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -27,6 +27,7 @@ public class SubscribeInfo { private int expires; private String eventId; private String eventType; + private String type; private SIPResponse response; /** @@ -138,4 +139,12 @@ public class SubscribeInfo { public void setSimulatedToTag(String simulatedToTag) { this.simulatedToTag = simulatedToTag; } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index c392a129d..6fab8afb6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -1,8 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.CmdType; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; @@ -24,6 +21,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -64,14 +62,39 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme sipProcessorObserver.addRequestProcessor(method, this); } - /** - * 处理SUBSCRIBE请求 - * + /** + * 处理SUBSCRIBE请求 + * * @param evt 事件 */ @Override public void process(RequestEvent evt) { SIPRequest request = (SIPRequest) evt.getRequest(); + // 判断是否为续订或者取消订阅请求 + String callId = request.getCallIdHeader().getCallId(); + if (!StringUtils.hasLength(callId)) { + logger.info("[订阅消息]: callID丢失: \r\n {}", request); + return; + } + SubscribeInfo subscribeInfo = subscribeHolder.getSubscribeByCallId(callId); + if (subscribeInfo != null) { + int expires = request.getExpires().getExpires(); + if (expires == 0) { + logger.info("[取消订阅]: callID: {}", callId); + // 移除订阅 + }else { + logger.info("[订阅续订]: callID: {}, 有效期: {}", callId, expires); + // 增加存活时间 + } + subscribeHolder.expires(subscribeInfo, expires); + try { + // 回复200 OK + responseAck(request, Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 订阅消息 应答消息 200: {}", e.getMessage()); + } + return; + } try { Element rootElement = getRootElement(evt); if (rootElement == null) { @@ -117,9 +140,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme if (platform == null) { return; } - + subscribeInfo.setType("MobilePosition"); + logger.info("[上级的移动位置订阅请求]: {}/{}, callID: {}, 有效期: {}", platformId, deviceId, request.getCallIdHeader().getCallId(), subscribeInfo.getExpires()); String sn = XmlUtil.getText(rootElement, "SN"); - logger.info("[回复上级的移动位置订阅请求]: {}", platformId); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") .append("\r\n") @@ -165,6 +188,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme if (request == null) { return; } + String platformId = SipUtils.getUserIdFromFromHeader(request); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); @@ -172,9 +196,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme return; } SubscribeInfo subscribeInfo = new SubscribeInfo(request, platformId); - + logger.info("[上级的目录订阅请求]: {}/{}, callID: {}, 有效期: {}", platformId, deviceId, request.getCallIdHeader().getCallId(), subscribeInfo.getExpires()); + subscribeInfo.setType("Catalog"); String sn = XmlUtil.getText(rootElement, "SN"); - logger.info("[回复上级的目录订阅请求]: {}/{}", platformId, deviceId); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") .append("\r\n") diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index d1b68d439..817cfc6b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -300,6 +300,7 @@ public class PlatformServiceImpl implements IPlatformService { subscribeInfo.setId(parentPlatform.getServerGBId()); subscribeInfo.setExpires(-1); subscribeInfo.setEventType("Catalog"); + subscribeInfo.setType("Catalog"); int random = (int) Math.floor(Math.random() * 10000); subscribeInfo.setEventId(random + ""); subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());