mirror of
https://gitee.com/pan648540858/wvp-GB28181-pro.git
synced 2026-05-23 21:47:49 +08:00
国标级联调整心跳逻辑
This commit is contained in:
parent
b0b5a0f5e0
commit
243f7f62b7
@ -1,13 +1,13 @@
|
|||||||
package com.genersoft.iot.vmp.common;
|
package com.genersoft.iot.vmp.common;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @description: 定义常量
|
* @description: 定义常量
|
||||||
* @author: swwheihei
|
* @author: swwheihei
|
||||||
* @date: 2019年5月30日 下午3:04:04
|
* @date: 2019年5月30日 下午3:04:04
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class VideoManagerConstants {
|
public class VideoManagerConstants {
|
||||||
|
|
||||||
public static final String WVP_SERVER_PREFIX = "VMP_SIGNALLING_SERVER_INFO_";
|
public static final String WVP_SERVER_PREFIX = "VMP_SIGNALLING_SERVER_INFO_";
|
||||||
|
|
||||||
public static final String WVP_SERVER_LIST = "VMP_SERVER_LIST";
|
public static final String WVP_SERVER_LIST = "VMP_SERVER_LIST";
|
||||||
@ -22,10 +22,6 @@ public class VideoManagerConstants {
|
|||||||
|
|
||||||
public static final String INVITE_PREFIX = "VMP_GB_INVITE_INFO";
|
public static final String INVITE_PREFIX = "VMP_GB_INVITE_INFO";
|
||||||
|
|
||||||
public static final String PLATFORM_CATCH_PREFIX = "VMP_PLATFORM_CATCH_";
|
|
||||||
|
|
||||||
public static final String PLATFORM_REGISTER_INFO_PREFIX = "VMP_PLATFORM_REGISTER_INFO_";
|
|
||||||
|
|
||||||
public static final String SEND_RTP_PORT = "VM_SEND_RTP_PORT:";
|
public static final String SEND_RTP_PORT = "VM_SEND_RTP_PORT:";
|
||||||
public static final String SEND_RTP_INFO_CALLID = "VMP_SEND_RTP_INFO:CALL_ID:";
|
public static final String SEND_RTP_INFO_CALLID = "VMP_SEND_RTP_INFO:CALL_ID:";
|
||||||
public static final String SEND_RTP_INFO_STREAM = "VMP_SEND_RTP_INFO:STREAM:";
|
public static final String SEND_RTP_INFO_STREAM = "VMP_SEND_RTP_INFO:STREAM:";
|
||||||
|
|||||||
@ -10,6 +10,8 @@ public class SipTransactionInfo {
|
|||||||
private String fromTag;
|
private String fromTag;
|
||||||
private String toTag;
|
private String toTag;
|
||||||
private String viaBranch;
|
private String viaBranch;
|
||||||
|
private int expires;
|
||||||
|
private String user;
|
||||||
|
|
||||||
// 自己是否媒体流发送者
|
// 自己是否媒体流发送者
|
||||||
private boolean asSender;
|
private boolean asSender;
|
||||||
|
|||||||
@ -101,7 +101,7 @@ public interface PlatformMapper {
|
|||||||
@Select("SELECT * FROM wvp_platform ")
|
@Select("SELECT * FROM wvp_platform ")
|
||||||
List<Platform> queryAll();
|
List<Platform> queryAll();
|
||||||
|
|
||||||
@Select("SELECT * FROM wvp_platform WHERE enable=true and server_id == #{serverId} group by server_id")
|
@Select("SELECT * FROM wvp_platform WHERE enable=true and server_id = #{serverId}")
|
||||||
List<Platform> queryServerIdsWithEnableAndServer(@Param("serverId") String serverId);
|
List<Platform> queryServerIdsWithEnableAndServer(@Param("serverId") String serverId);
|
||||||
|
|
||||||
@Update("UPDATE wvp_platform SET status=false" )
|
@Update("UPDATE wvp_platform SET status=false" )
|
||||||
|
|||||||
@ -53,7 +53,7 @@ public interface IPlatformService {
|
|||||||
* 平台离线
|
* 平台离线
|
||||||
* @param parentPlatform 平台信息
|
* @param parentPlatform 平台信息
|
||||||
*/
|
*/
|
||||||
void offline(Platform parentPlatform, boolean stopRegisterTask);
|
void offline(Platform parentPlatform);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 向上级平台发送位置订阅
|
* 向上级平台发送位置订阅
|
||||||
|
|||||||
@ -38,6 +38,7 @@ import gov.nist.javax.sip.message.SIPResponse;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.CommandLineRunner;
|
import org.springframework.boot.CommandLineRunner;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
|
@Order(value=16)
|
||||||
public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -707,6 +709,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
public void subscribeMobilePosition(int id, int cycle, int interval) {
|
public void subscribeMobilePosition(int id, int cycle, int interval) {
|
||||||
Device device = deviceMapper.query(id);
|
Device device = deviceMapper.query(id);
|
||||||
Assert.notNull(device, "未找到设备");
|
Assert.notNull(device, "未找到设备");
|
||||||
|
|
||||||
if (device.getSubscribeCycleForMobilePosition() == cycle) {
|
if (device.getSubscribeCycleForMobilePosition() == cycle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -729,6 +732,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
// 订阅未开启
|
// 订阅未开启
|
||||||
device.setSubscribeCycleForMobilePosition(cycle);
|
device.setSubscribeCycleForMobilePosition(cycle);
|
||||||
device.setMobilePositionSubmissionInterval(interval);
|
device.setMobilePositionSubmissionInterval(interval);
|
||||||
|
updateDevice(device);
|
||||||
// 开启订阅
|
// 开启订阅
|
||||||
addMobilePositionSubscribe(device, null);
|
addMobilePositionSubscribe(device, null);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.CommandLineRunner;
|
import org.springframework.boot.CommandLineRunner;
|
||||||
import org.springframework.context.event.EventListener;
|
import org.springframework.context.event.EventListener;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
|
@Order(value=15)
|
||||||
public class PlatformServiceImpl implements IPlatformService, CommandLineRunner {
|
public class PlatformServiceImpl implements IPlatformService, CommandLineRunner {
|
||||||
|
|
||||||
private final static String REGISTER_KEY_PREFIX = "platform_register_";
|
private final static String REGISTER_KEY_PREFIX = "platform_register_";
|
||||||
@ -142,14 +144,25 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (Platform platform : platformList) {
|
for (Platform platform : platformList) {
|
||||||
sendRegister(platform, null);
|
if (statusTaskRunner.containsRegister(platform.getServerGBId()) && statusTaskRunner.containsKeepAlive(platform.getServerGBId())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (statusTaskRunner.containsRegister(platform.getServerGBId())) {
|
||||||
|
SipTransactionInfo transactionInfo = statusTaskRunner.getRegisterTransactionInfo(platform.getServerGBId());
|
||||||
|
// 注销后出发平台离线, 如果是启用的平台,那么下次丢失检测会检测到并重新注册上线
|
||||||
|
sendUnRegister(platform, transactionInfo);
|
||||||
|
}else {
|
||||||
|
statusTaskRunner.removeKeepAliveTask(platform.getServerGBId());
|
||||||
|
sendRegister(platform, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendRegister(Platform platform, SipTransactionInfo sipTransactionInfo) {
|
private void sendRegister(Platform platform, SipTransactionInfo sipTransactionInfo) {
|
||||||
try {
|
try {
|
||||||
commanderForPlatform.register(platform, sipTransactionInfo, eventResult -> {
|
commanderForPlatform.register(platform, sipTransactionInfo, eventResult -> {
|
||||||
log.info("[国标级联] {}({}),添加向上级注册失败,请确定上级平台可用时重新保存", platform.getName(), platform.getServerGBId());
|
log.info("[国标级联] {}({}),注册失败", platform.getName(), platform.getServerGBId());
|
||||||
|
offline(platform);
|
||||||
}, null);
|
}, null);
|
||||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
} catch (InvalidArgumentException | ParseException | SipException e) {
|
||||||
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
|
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
|
||||||
@ -339,6 +352,8 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
return result > 0;
|
return result > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean update(Platform platform) {
|
public boolean update(Platform platform) {
|
||||||
Assert.isTrue(platform.getId() > 0, "ID必须存在");
|
Assert.isTrue(platform.getId() > 0, "ID必须存在");
|
||||||
@ -366,13 +381,13 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
@Override
|
@Override
|
||||||
public void online(Platform platform, SipTransactionInfo sipTransactionInfo) {
|
public void online(Platform platform, SipTransactionInfo sipTransactionInfo) {
|
||||||
log.info("[国标级联]:{}, 平台上线", platform.getServerGBId());
|
log.info("[国标级联]:{}, 平台上线", platform.getServerGBId());
|
||||||
PlatformRegisterTask registerTask = new PlatformRegisterTask(platform.getServerId(), platform.getExpires() * 1000L - 500L,
|
PlatformRegisterTask registerTask = new PlatformRegisterTask(platform.getServerGBId(), platform.getExpires() * 1000L - 500L,
|
||||||
sipTransactionInfo, (platformServerGbId) -> {
|
sipTransactionInfo, (platformServerGbId) -> {
|
||||||
this.registerExpire(platformServerGbId, sipTransactionInfo);
|
this.registerExpire(platformServerGbId, sipTransactionInfo);
|
||||||
});
|
});
|
||||||
statusTaskRunner.addRegisterTask(registerTask);
|
statusTaskRunner.addRegisterTask(registerTask);
|
||||||
|
|
||||||
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L,
|
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L,
|
||||||
this::keepaliveExpire);
|
this::keepaliveExpire);
|
||||||
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||||
|
|
||||||
@ -421,31 +436,31 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
// 心跳超时失败
|
// 心跳超时失败
|
||||||
if (failCount < 2) {
|
if (failCount < 2) {
|
||||||
log.info("[国标级联] 心跳发送超时, 平台服务编号: {}", platformServerId);
|
log.info("[国标级联] 心跳发送超时, 平台服务编号: {}", platformServerId);
|
||||||
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L,
|
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L,
|
||||||
this::keepaliveExpire);
|
this::keepaliveExpire);
|
||||||
keepaliveTask.setFailCount(failCount + 1);
|
keepaliveTask.setFailCount(failCount + 1);
|
||||||
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||||
}else {
|
}else {
|
||||||
// 心跳超时三次, 不再发送心跳, 平台离线
|
// 心跳超时三次, 不再发送心跳, 平台离线
|
||||||
log.info("[国标级联] 心跳发送超时三次,平台离线, 平台服务编号: {}", platformServerId);
|
log.info("[国标级联] 心跳发送超时三次,平台离线, 平台服务编号: {}", platformServerId);
|
||||||
offline(platform, false);
|
offline(platform);
|
||||||
}
|
}
|
||||||
}, eventResult -> {
|
}, eventResult -> {
|
||||||
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L,
|
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L,
|
||||||
this::keepaliveExpire);
|
this::keepaliveExpire);
|
||||||
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||||
});
|
});
|
||||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
log.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
|
log.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
|
||||||
if (failCount < 2) {
|
if (failCount < 2) {
|
||||||
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L,
|
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L,
|
||||||
this::keepaliveExpire);
|
this::keepaliveExpire);
|
||||||
keepaliveTask.setFailCount(failCount + 1);
|
keepaliveTask.setFailCount(failCount + 1);
|
||||||
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||||
}else {
|
}else {
|
||||||
// 心跳超时三次, 不再发送心跳, 平台离线
|
// 心跳超时三次, 不再发送心跳, 平台离线
|
||||||
log.info("[国标级联] 心跳发送失败三次,平台离线, 平台服务编号: {}", platformServerId);
|
log.info("[国标级联] 心跳发送失败三次,平台离线, 平台服务编号: {}", platformServerId);
|
||||||
offline(platform, false);
|
offline(platform);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -458,7 +473,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void offline(Platform platform, boolean stopRegister) {
|
public void offline(Platform platform) {
|
||||||
log.info("[平台离线]:{}({})", platform.getName(), platform.getServerGBId());
|
log.info("[平台离线]:{}({})", platform.getName(), platform.getServerGBId());
|
||||||
statusTaskRunner.removeRegisterTask(platform.getServerGBId());
|
statusTaskRunner.removeRegisterTask(platform.getServerGBId());
|
||||||
statusTaskRunner.removeKeepAliveTask(platform.getServerGBId());
|
statusTaskRunner.removeKeepAliveTask(platform.getServerGBId());
|
||||||
@ -475,7 +490,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
|
|
||||||
private void stopAllPush(String platformId) {
|
private void stopAllPush(String platformId) {
|
||||||
List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryForPlatform(platformId);
|
List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryForPlatform(platformId);
|
||||||
if (sendRtpItems != null && sendRtpItems.size() > 0) {
|
if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
|
||||||
for (SendRtpInfo sendRtpItem : sendRtpItems) {
|
for (SendRtpInfo sendRtpItem : sendRtpItems) {
|
||||||
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
|
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
|
||||||
sendRtpServerService.delete(sendRtpItem);
|
sendRtpServerService.delete(sendRtpItem);
|
||||||
|
|||||||
@ -38,7 +38,7 @@ public class PlatformRegisterTask implements Delayed {
|
|||||||
|
|
||||||
public PlatformRegisterTask(String platformServerId, long delayTime, SipTransactionInfo sipTransactionInfo, CommonCallback<String> callback) {
|
public PlatformRegisterTask(String platformServerId, long delayTime, SipTransactionInfo sipTransactionInfo, CommonCallback<String> callback) {
|
||||||
this.platformServerId = platformServerId;
|
this.platformServerId = platformServerId;
|
||||||
this.delayTime = System.currentTimeMillis() + delayTime;
|
this.delayTime = System.currentTimeMillis() + delayTime;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
this.sipTransactionInfo = sipTransactionInfo;
|
this.sipTransactionInfo = sipTransactionInfo;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -147,10 +147,9 @@ public class PlatformStatusTaskRunner {
|
|||||||
|
|
||||||
public boolean removeKeepAliveTask(String platformServerId) {
|
public boolean removeKeepAliveTask(String platformServerId) {
|
||||||
PlatformKeepaliveTask task = keepaliveSubscribes.get(platformServerId);
|
PlatformKeepaliveTask task = keepaliveSubscribes.get(platformServerId);
|
||||||
if (task == null) {
|
if (task != null) {
|
||||||
return false;
|
keepaliveSubscribes.remove(platformServerId);
|
||||||
}
|
}
|
||||||
keepaliveSubscribes.remove(platformServerId);
|
|
||||||
if (keepaliveTaskDelayQueue.contains(task)) {
|
if (keepaliveTaskDelayQueue.contains(task)) {
|
||||||
boolean remove = keepaliveTaskDelayQueue.remove(task);
|
boolean remove = keepaliveTaskDelayQueue.remove(task);
|
||||||
if (!remove) {
|
if (!remove) {
|
||||||
|
|||||||
@ -84,6 +84,11 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
|
|||||||
|
|
||||||
// Success
|
// Success
|
||||||
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
|
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
|
||||||
|
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(response.getCSeqHeader().getMethod());
|
||||||
|
if (sipRequestProcessor != null) {
|
||||||
|
sipRequestProcessor.process(responseEvent);
|
||||||
|
}
|
||||||
|
|
||||||
CallIdHeader callIdHeader = response.getCallIdHeader();
|
CallIdHeader callIdHeader = response.getCallIdHeader();
|
||||||
CSeqHeader cSeqHeader = response.getCSeqHeader();
|
CSeqHeader cSeqHeader = response.getCSeqHeader();
|
||||||
if (callIdHeader != null) {
|
if (callIdHeader != null) {
|
||||||
@ -96,10 +101,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
|
|||||||
sipSubscribe.removeSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber());
|
sipSubscribe.removeSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(response.getCSeqHeader().getMethod());
|
|
||||||
if (sipRequestProcessor != null) {
|
|
||||||
sipRequestProcessor.process(responseEvent);
|
|
||||||
}
|
|
||||||
} else if ((status >= Response.TRYING) && (status < Response.OK)) {
|
} else if ((status >= Response.TRYING) && (status < Response.OK)) {
|
||||||
// 增加其它无需回复的响应,如101、180等
|
// 增加其它无需回复的响应,如101、180等
|
||||||
// 更新sip订阅的时间
|
// 更新sip订阅的时间
|
||||||
|
|||||||
@ -8,6 +8,9 @@ import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
|
|||||||
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
|
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
|
||||||
import com.genersoft.iot.vmp.utils.GitUtil;
|
import com.genersoft.iot.vmp.utils.GitUtil;
|
||||||
import gov.nist.javax.sip.SipProviderImpl;
|
import gov.nist.javax.sip.SipProviderImpl;
|
||||||
|
import gov.nist.javax.sip.address.SipUri;
|
||||||
|
import gov.nist.javax.sip.message.SIPRequest;
|
||||||
|
import gov.nist.javax.sip.message.SIPResponse;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@ -37,6 +40,7 @@ public class SIPSender {
|
|||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private SipSubscribe sipSubscribe;
|
private SipSubscribe sipSubscribe;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private SipConfig sipConfig;
|
private SipConfig sipConfig;
|
||||||
|
|
||||||
@ -86,15 +90,25 @@ public class SIPSender {
|
|||||||
}), timeout == null ? sipConfig.getTimeout() : timeout);
|
}), timeout == null ? sipConfig.getTimeout() : timeout);
|
||||||
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo();
|
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo();
|
||||||
sipTransactionInfo.setFromTag(fromHeader.getTag());
|
sipTransactionInfo.setFromTag(fromHeader.getTag());
|
||||||
sipTransactionInfo.setFromTag(fromHeader.getTag());
|
sipTransactionInfo.setCallId(callIdHeader.getCallId());
|
||||||
|
|
||||||
|
if (message instanceof SIPResponse) {
|
||||||
if (message instanceof Response) {
|
SIPResponse response = (SIPResponse) message;
|
||||||
ToHeader toHeader = (ToHeader) message.getHeader(ToHeader.NAME);
|
sipTransactionInfo.setToTag(response.getToHeader().getTag());
|
||||||
sipTransactionInfo.setToTag(toHeader.getTag());
|
sipTransactionInfo.setViaBranch(response.getTopmostViaHeader().getBranch());
|
||||||
|
}else if (message instanceof SIPRequest) {
|
||||||
|
SIPRequest request = (SIPRequest) message;
|
||||||
|
sipTransactionInfo.setViaBranch(request.getTopmostViaHeader().getBranch());
|
||||||
|
SipUri sipUri = (SipUri)request.getRequestLine().getUri();
|
||||||
|
sipTransactionInfo.setUser(sipUri.getUser());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
ExpiresHeader expiresHeader = (ExpiresHeader) message.getHeader(ExpiresHeader.NAME);
|
||||||
|
if (expiresHeader != null) {
|
||||||
|
sipTransactionInfo.setExpires(expiresHeader.getExpires());
|
||||||
|
}
|
||||||
sipEvent.setSipTransactionInfo(sipTransactionInfo);
|
sipEvent.setSipTransactionInfo(sipTransactionInfo);
|
||||||
sipSubscribe.addSubscribe(key, sipEvent);
|
sipSubscribe.addSubscribe(key, sipEvent);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,7 +21,6 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
|||||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||||
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
|
|
||||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
import com.genersoft.iot.vmp.utils.GitUtil;
|
import com.genersoft.iot.vmp.utils.GitUtil;
|
||||||
import gov.nist.javax.sip.message.MessageFactoryImpl;
|
import gov.nist.javax.sip.message.MessageFactoryImpl;
|
||||||
@ -121,9 +120,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
|
|||||||
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform,
|
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform,
|
||||||
redisCatchStorage.getCSEQ(), fromTag,
|
redisCatchStorage.getCSEQ(), fromTag,
|
||||||
toTag, callIdHeader, isRegister? parentPlatform.getExpires() : 0);
|
toTag, callIdHeader, isRegister? parentPlatform.getExpires() : 0);
|
||||||
// 将 callid 写入缓存, 等注册成功可以更新状态
|
|
||||||
String callIdFromHeader = callIdHeader.getCallId();
|
|
||||||
redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister));
|
|
||||||
}else {
|
}else {
|
||||||
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0);
|
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0);
|
||||||
}
|
}
|
||||||
@ -132,7 +128,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
|
|||||||
if (event != null) {
|
if (event != null) {
|
||||||
log.info("[国标级联]:{}, 注册失败: {} ", parentPlatform.getServerGBId(), event.msg);
|
log.info("[国标级联]:{}, 注册失败: {} ", parentPlatform.getServerGBId(), event.msg);
|
||||||
}
|
}
|
||||||
redisCatchStorage.delPlatformRegisterInfo(callIdHeader.getCallId());
|
|
||||||
if (errorEvent != null ) {
|
if (errorEvent != null ) {
|
||||||
errorEvent.response(event);
|
errorEvent.response(event);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,14 +1,14 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
|
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatch;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
|
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
|
||||||
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
|
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
|
|
||||||
import gov.nist.javax.sip.message.SIPResponse;
|
import gov.nist.javax.sip.message.SIPResponse;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -44,6 +44,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IPlatformService platformService;
|
private IPlatformService platformService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SipSubscribe sipSubscribe;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterPropertiesSet() throws Exception {
|
public void afterPropertiesSet() throws Exception {
|
||||||
// 添加消息处理的订阅
|
// 添加消息处理的订阅
|
||||||
@ -59,23 +62,19 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
|
|||||||
public void process(ResponseEvent evt) {
|
public void process(ResponseEvent evt) {
|
||||||
SIPResponse response = (SIPResponse)evt.getResponse();
|
SIPResponse response = (SIPResponse)evt.getResponse();
|
||||||
String callId = response.getCallIdHeader().getCallId();
|
String callId = response.getCallIdHeader().getCallId();
|
||||||
PlatformRegisterInfo platformRegisterInfo = redisCatchStorage.queryPlatformRegisterInfo(callId);
|
long seqNumber = response.getCSeqHeader().getSeqNumber();
|
||||||
if (platformRegisterInfo == null) {
|
SipEvent subscribe = sipSubscribe.getSubscribe(callId + seqNumber);
|
||||||
log.info(String.format("[国标级联]未找到callId: %s 的注册/注销平台id", callId ));
|
if (subscribe == null || subscribe.getSipTransactionInfo() == null || subscribe.getSipTransactionInfo().getUser() == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
PlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformRegisterInfo.getPlatformId());
|
String action = subscribe.getSipTransactionInfo().getExpires() > 0 ? "注册" : "注销";
|
||||||
if (parentPlatformCatch == null) {
|
String platFormServerGbId = subscribe.getSipTransactionInfo().getUser();
|
||||||
log.warn(String.format("[国标级联]收到注册/注销%S请求,平台:%s,但是平台缓存信息未查询到!!!", response.getStatusCode(),platformRegisterInfo.getPlatformId()));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
String action = platformRegisterInfo.isRegister() ? "注册" : "注销";
|
log.info("[国标级联]{} {}响应 {} ", action, response.getStatusCode(), platFormServerGbId);
|
||||||
log.info(String.format("[国标级联]%s %S响应,%s ", action, response.getStatusCode(), platformRegisterInfo.getPlatformId() ));
|
Platform platform = platformService.queryPlatformByServerGBId(platFormServerGbId);
|
||||||
Platform parentPlatform = parentPlatformCatch.getPlatform();
|
if (platform == null) {
|
||||||
if (parentPlatform == null) {
|
log.warn("[国标级联]收到 来自{}的 {} 回复 {}, 但是平台信息未查询到!!!", platFormServerGbId, action, response.getStatusCode());
|
||||||
log.warn(String.format("[国标级联]收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformRegisterInfo.getPlatformId(), action, response.getStatusCode()));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,20 +82,17 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
|
|||||||
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
|
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
|
||||||
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
|
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
|
||||||
try {
|
try {
|
||||||
sipCommanderForPlatform.register(parentPlatform, sipTransactionInfo, www, null, null, platformRegisterInfo.isRegister());
|
sipCommanderForPlatform.register(platform, sipTransactionInfo, www, null, null, subscribe.getSipTransactionInfo().getExpires() > 0);
|
||||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
log.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage());
|
log.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}else if (response.getStatusCode() == Response.OK){
|
}else if (response.getStatusCode() == Response.OK){
|
||||||
if (platformRegisterInfo.isRegister()) {
|
if (subscribe.getSipTransactionInfo().getExpires() > 0) {
|
||||||
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
|
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
|
||||||
platformService.online(parentPlatform, sipTransactionInfo);
|
platformService.online(platform, sipTransactionInfo);
|
||||||
}else {
|
}else {
|
||||||
platformService.offline(parentPlatform, true);
|
platformService.offline(platform);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 注册/注销成功移除缓存的信息
|
|
||||||
redisCatchStorage.delPlatformRegisterInfo(callId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -231,6 +231,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
|
|||||||
RedisRpcRequest request = buildRequest("platform/update", platform);
|
RedisRpcRequest request = buildRequest("platform/update", platform);
|
||||||
request.setToId(serverId);
|
request.setToId(serverId);
|
||||||
RedisRpcResponse response = redisRpcConfig.request(request, 40, TimeUnit.MILLISECONDS);
|
RedisRpcResponse response = redisRpcConfig.request(request, 40, TimeUnit.MILLISECONDS);
|
||||||
|
if(response == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return Boolean.parseBoolean(response.getBody().toString());
|
return Boolean.parseBoolean(response.getBody().toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
|
|||||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
||||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||||
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
||||||
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -23,16 +22,6 @@ public interface IRedisCatchStorage {
|
|||||||
*/
|
*/
|
||||||
Long getCSEQ();
|
Long getCSEQ();
|
||||||
|
|
||||||
PlatformCatch queryPlatformCatchInfo(String platformGbId);
|
|
||||||
|
|
||||||
void delPlatformCatchInfo(String platformGbId);
|
|
||||||
|
|
||||||
void updatePlatformRegisterInfo(String callId, PlatformRegisterInfo platformRegisterInfo);
|
|
||||||
|
|
||||||
PlatformRegisterInfo queryPlatformRegisterInfo(String callId);
|
|
||||||
|
|
||||||
void delPlatformRegisterInfo(String callId);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 在redis添加wvp的信息
|
* 在redis添加wvp的信息
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -15,7 +15,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
|||||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||||
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
|
|
||||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
import com.genersoft.iot.vmp.utils.JsonUtil;
|
import com.genersoft.iot.vmp.utils.JsonUtil;
|
||||||
import com.genersoft.iot.vmp.utils.SystemInfoUtils;
|
import com.genersoft.iot.vmp.utils.SystemInfoUtils;
|
||||||
@ -73,35 +72,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
|
|||||||
redisTemplate.opsForValue().set(key, 1);
|
redisTemplate.opsForValue().set(key, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public PlatformCatch queryPlatformCatchInfo(String platformGbId) {
|
|
||||||
return (PlatformCatch)redisTemplate.opsForValue().get(VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + platformGbId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void delPlatformCatchInfo(String platformGbId) {
|
|
||||||
redisTemplate.delete(VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + platformGbId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void updatePlatformRegisterInfo(String callId, PlatformRegisterInfo platformRegisterInfo) {
|
|
||||||
String key = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId;
|
|
||||||
Duration duration = Duration.ofSeconds(30L);
|
|
||||||
redisTemplate.opsForValue().set(key, platformRegisterInfo, duration);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public PlatformRegisterInfo queryPlatformRegisterInfo(String callId) {
|
|
||||||
return (PlatformRegisterInfo)redisTemplate.opsForValue().get(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void delPlatformRegisterInfo(String callId) {
|
|
||||||
redisTemplate.delete(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateWVPInfo(ServerInfo serverInfo, int time) {
|
public void updateWVPInfo(ServerInfo serverInfo, int time) {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user