mirror of
https://gitee.com/pan648540858/wvp-GB28181-pro.git
synced 2026-05-24 14:07:50 +08:00
Compare commits
3 Commits
e59b300c2e
...
29321beeaf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
29321beeaf | ||
|
|
1ca0ea78b8 | ||
|
|
82c2cc9bd2 |
@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ -17,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
@Component
|
||||
public class SubscribeHolder {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SubscribeHolder.class);
|
||||
@Autowired
|
||||
private DynamicTask dynamicTask;
|
||||
|
||||
@ -33,10 +36,10 @@ public class SubscribeHolder {
|
||||
catalogMap.put(platformId, subscribeInfo);
|
||||
if (subscribeInfo.getExpires() > 0) {
|
||||
// 添加订阅到期
|
||||
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
|
||||
String taskOverdueKey = taskOverduePrefix + "Catalog_" + platformId;
|
||||
// 添加任务处理订阅过期
|
||||
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
|
||||
subscribeInfo.getExpires() * 1000);
|
||||
subscribeInfo.getExpires() * 1000 + 1000);
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,7 +50,7 @@ public class SubscribeHolder {
|
||||
public void removeCatalogSubscribe(String platformId) {
|
||||
|
||||
catalogMap.remove(platformId);
|
||||
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
|
||||
String taskOverdueKey = taskOverduePrefix + "Catalog_" + platformId;
|
||||
Runnable runnable = dynamicTask.get(taskOverdueKey);
|
||||
if (runnable instanceof ISubscribeTask) {
|
||||
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
|
||||
@ -62,21 +65,21 @@ public class SubscribeHolder {
|
||||
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId;
|
||||
// 添加任务处理GPS定时推送
|
||||
|
||||
int cycleForCatalog;
|
||||
int cycle;
|
||||
if (subscribeInfo.getGpsInterval() <= 0) {
|
||||
cycleForCatalog = 5;
|
||||
cycle = 5;
|
||||
}else {
|
||||
cycleForCatalog = subscribeInfo.getGpsInterval();
|
||||
cycle = subscribeInfo.getGpsInterval();
|
||||
}
|
||||
dynamicTask.startCron(key, gpsTask,
|
||||
cycleForCatalog * 1000);
|
||||
dynamicTask.startCron(key, gpsTask, cycle * 1000);
|
||||
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
|
||||
if (subscribeInfo.getExpires() > 0) {
|
||||
// 添加任务处理订阅过期
|
||||
dynamicTask.startDelay(taskOverdueKey, () -> {
|
||||
log.info("[移动位置订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId());
|
||||
removeMobilePositionSubscribe(subscribeInfo.getId());
|
||||
},
|
||||
subscribeInfo.getExpires() * 1000);
|
||||
subscribeInfo.getExpires() * 1000 + 1000);
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,6 +88,7 @@ public class SubscribeHolder {
|
||||
}
|
||||
|
||||
public void removeMobilePositionSubscribe(String platformId) {
|
||||
|
||||
mobilePositionMap.remove(platformId);
|
||||
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId;
|
||||
// 结束任务处理GPS定时推送
|
||||
@ -123,4 +127,58 @@ public class SubscribeHolder {
|
||||
removeMobilePositionSubscribe(platformId);
|
||||
removeCatalogSubscribe(platformId);
|
||||
}
|
||||
|
||||
public SubscribeInfo getSubscribeByCallId(String callId) {
|
||||
for (SubscribeInfo subscribeInfo : catalogMap.values()) {
|
||||
if (subscribeInfo.getRequest() != null && subscribeInfo.getRequest().getCallIdHeader().getCallId().equals(callId)){
|
||||
return subscribeInfo;
|
||||
}
|
||||
}
|
||||
for (SubscribeInfo subscribeInfo : mobilePositionMap.values()) {
|
||||
if (subscribeInfo.getRequest() != null && subscribeInfo.getRequest().getCallIdHeader().getCallId().equals(callId)){
|
||||
return subscribeInfo;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void expires(SubscribeInfo subscribeInfo, int expires) {
|
||||
|
||||
String taskOverdueKey = taskOverduePrefix + subscribeInfo.getType() + "_" + subscribeInfo.getId();
|
||||
if (expires > 0) {
|
||||
subscribeInfo.setExpires(expires);
|
||||
// 添加任务处理订阅过期
|
||||
dynamicTask.startDelay(taskOverdueKey, () -> {
|
||||
if ("Catalog".equals(subscribeInfo.getType())) {
|
||||
catalogMap.remove(subscribeInfo.getId());
|
||||
log.info("[目录订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId());
|
||||
removeCatalogSubscribe(subscribeInfo.getId());
|
||||
}else {
|
||||
log.info("[移动位置订阅] 到期,{}, callId: {}", subscribeInfo.getId(), subscribeInfo.getRequest().getCallId());
|
||||
removeMobilePositionSubscribe(subscribeInfo.getId());
|
||||
}
|
||||
},
|
||||
expires * 1000 + 1000);
|
||||
if ("Catalog".equals(subscribeInfo.getType())) {
|
||||
catalogMap.put(subscribeInfo.getId(), subscribeInfo);
|
||||
}else {
|
||||
mobilePositionMap.put(subscribeInfo.getId(), subscribeInfo);
|
||||
}
|
||||
}else {
|
||||
Runnable runnable = dynamicTask.get(taskOverdueKey);
|
||||
if (runnable instanceof ISubscribeTask) {
|
||||
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
|
||||
subscribeTask.stop(null);
|
||||
}
|
||||
dynamicTask.stop(taskOverdueKey);
|
||||
|
||||
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + subscribeInfo.getType() + "_" + subscribeInfo.getId();
|
||||
dynamicTask.stop(key);
|
||||
if ("Catalog".equals(subscribeInfo.getType())) {
|
||||
catalogMap.remove(subscribeInfo.getId());
|
||||
}else {
|
||||
mobilePositionMap.remove(subscribeInfo.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ public class SubscribeInfo {
|
||||
private int expires;
|
||||
private String eventId;
|
||||
private String eventType;
|
||||
private String type;
|
||||
private SIPResponse response;
|
||||
|
||||
/**
|
||||
@ -138,4 +139,12 @@ public class SubscribeInfo {
|
||||
public void setSimulatedToTag(String simulatedToTag) {
|
||||
this.simulatedToTag = simulatedToTag;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,7 +14,6 @@ import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
|
||||
import com.genersoft.iot.vmp.service.IDeviceChannelService;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import gov.nist.javax.sip.message.SIPRequest;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.slf4j.Logger;
|
||||
@ -25,14 +24,11 @@ import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.sip.RequestEvent;
|
||||
import javax.sip.header.CSeqHeader;
|
||||
import javax.sip.header.FromHeader;
|
||||
import javax.sip.message.Request;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
/**
|
||||
* SIP命令类型: NOTIFY请求中的目录请求处理
|
||||
@ -142,6 +138,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
|
||||
channel.setParentId(null);
|
||||
}
|
||||
channel.setDeviceId(device.getDeviceId());
|
||||
channel.setUpdateTime(DateUtil.getNow());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
|
||||
}
|
||||
|
||||
@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import javax.sip.RequestEvent;
|
||||
@ -151,21 +150,13 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
|
||||
mobilePosition.setAltitude(0.0);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
|
||||
logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
|
||||
mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
|
||||
mobilePosition.setReportSource("Mobile Position");
|
||||
|
||||
mobilePositionService.add(mobilePosition);
|
||||
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
|
||||
try {
|
||||
eventPublisher.mobilePositionEventPublish(mobilePosition);
|
||||
}catch (Exception e) {
|
||||
logger.error("[向上级转发移动位置失败] ", e);
|
||||
}
|
||||
if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) ) {
|
||||
List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
|
||||
channels.forEach(channel -> {
|
||||
@ -194,6 +185,15 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
|
||||
jsonObject.put("speed", mobilePosition.getSpeed());
|
||||
redisCatchStorage.sendMobilePositionMsg(jsonObject);
|
||||
}
|
||||
|
||||
mobilePositionService.add(mobilePosition);
|
||||
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
|
||||
try {
|
||||
eventPublisher.mobilePositionEventPublish(mobilePosition);
|
||||
}catch (Exception e) {
|
||||
logger.error("[向上级转发移动位置失败] ", e);
|
||||
}
|
||||
|
||||
} catch (DocumentException e) {
|
||||
logger.error("未处理的异常 ", e);
|
||||
}
|
||||
|
||||
@ -1,8 +1,5 @@
|
||||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.CmdType;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
|
||||
@ -24,6 +21,7 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.RequestEvent;
|
||||
@ -64,14 +62,39 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
|
||||
sipProcessorObserver.addRequestProcessor(method, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理SUBSCRIBE请求
|
||||
*
|
||||
/**
|
||||
* 处理SUBSCRIBE请求
|
||||
*
|
||||
* @param evt 事件
|
||||
*/
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
SIPRequest request = (SIPRequest) evt.getRequest();
|
||||
// 判断是否为续订或者取消订阅请求
|
||||
String callId = request.getCallIdHeader().getCallId();
|
||||
if (!StringUtils.hasLength(callId)) {
|
||||
logger.info("[订阅消息]: callID丢失: \r\n {}", request);
|
||||
return;
|
||||
}
|
||||
SubscribeInfo subscribeInfo = subscribeHolder.getSubscribeByCallId(callId);
|
||||
if (subscribeInfo != null) {
|
||||
int expires = request.getExpires().getExpires();
|
||||
if (expires == 0) {
|
||||
logger.info("[取消订阅]: callID: {}", callId);
|
||||
// 移除订阅
|
||||
}else {
|
||||
logger.info("[订阅续订]: callID: {}, 有效期: {}", callId, expires);
|
||||
// 增加存活时间
|
||||
}
|
||||
subscribeHolder.expires(subscribeInfo, expires);
|
||||
try {
|
||||
// 回复200 OK
|
||||
responseAck(request, Response.OK);
|
||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||
logger.error("[命令发送失败] 订阅消息 应答消息 200: {}", e.getMessage());
|
||||
}
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Element rootElement = getRootElement(evt);
|
||||
if (rootElement == null) {
|
||||
@ -117,9 +140,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
|
||||
if (platform == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
subscribeInfo.setType("MobilePosition");
|
||||
logger.info("[上级的移动位置订阅请求]: {}/{}, callID: {}, 有效期: {}", platformId, deviceId, request.getCallIdHeader().getCallId(), subscribeInfo.getExpires());
|
||||
String sn = XmlUtil.getText(rootElement, "SN");
|
||||
logger.info("[回复上级的移动位置订阅请求]: {}", platformId);
|
||||
StringBuilder resultXml = new StringBuilder(200);
|
||||
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
|
||||
.append("<Response>\r\n")
|
||||
@ -165,6 +188,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
|
||||
if (request == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String platformId = SipUtils.getUserIdFromFromHeader(request);
|
||||
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
|
||||
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
|
||||
@ -172,9 +196,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
|
||||
return;
|
||||
}
|
||||
SubscribeInfo subscribeInfo = new SubscribeInfo(request, platformId);
|
||||
|
||||
logger.info("[上级的目录订阅请求]: {}/{}, callID: {}, 有效期: {}", platformId, deviceId, request.getCallIdHeader().getCallId(), subscribeInfo.getExpires());
|
||||
subscribeInfo.setType("Catalog");
|
||||
String sn = XmlUtil.getText(rootElement, "SN");
|
||||
logger.info("[回复上级的目录订阅请求]: {}/{}", platformId, deviceId);
|
||||
StringBuilder resultXml = new StringBuilder(200);
|
||||
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
|
||||
.append("<Response>\r\n")
|
||||
|
||||
@ -300,6 +300,7 @@ public class PlatformServiceImpl implements IPlatformService {
|
||||
subscribeInfo.setId(parentPlatform.getServerGBId());
|
||||
subscribeInfo.setExpires(-1);
|
||||
subscribeInfo.setEventType("Catalog");
|
||||
subscribeInfo.setType("Catalog");
|
||||
int random = (int) Math.floor(Math.random() * 10000);
|
||||
subscribeInfo.setEventId(random + "");
|
||||
subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());
|
||||
|
||||
@ -570,7 +570,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
|
||||
@Override
|
||||
public void sendMobilePositionMsg(JSONObject jsonObject) {
|
||||
String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
|
||||
logger.debug("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
|
||||
logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
|
||||
redisTemplate.convertAndSend(key, jsonObject);
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user