mirror of
https://gitee.com/pan648540858/wvp-GB28181-pro.git
synced 2026-05-24 22:17:49 +08:00
国标级联调整注册逻辑
This commit is contained in:
parent
7bb0cc19f4
commit
b0b5a0f5e0
@ -1,71 +0,0 @@
|
|||||||
package com.genersoft.iot.vmp.conf;
|
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatch;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.CommandLineRunner;
|
|
||||||
import org.springframework.core.annotation.Order;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 系统启动时控制上级平台重新注册
|
|
||||||
* @author lin
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
@Order(value=13)
|
|
||||||
public class SipPlatformRunner implements CommandLineRunner {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IRedisCatchStorage redisCatchStorage;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IPlatformService platformService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ISIPCommanderForPlatform sipCommanderForPlatform;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private UserSetting userSetting;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run(String... args) throws Exception {
|
|
||||||
// 获取所有启用的平台
|
|
||||||
List<Platform> parentPlatforms = platformService.queryEnablePlatformList(userSetting.getServerId());
|
|
||||||
|
|
||||||
for (Platform platform : parentPlatforms) {
|
|
||||||
|
|
||||||
PlatformCatch platformCatchOld = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId());
|
|
||||||
|
|
||||||
// 更新缓存
|
|
||||||
PlatformCatch platformCatch = new PlatformCatch();
|
|
||||||
platformCatch.setPlatform(platform);
|
|
||||||
platformCatch.setId(platform.getServerGBId());
|
|
||||||
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
|
|
||||||
if (platformCatchOld != null) {
|
|
||||||
// 取消订阅
|
|
||||||
try {
|
|
||||||
log.info("[平台主动注销] {}({})", platform.getName(), platform.getServerGBId());
|
|
||||||
sipCommanderForPlatform.unregister(platform, platformCatchOld.getSipTransactionInfo(), null, (eventResult)->{
|
|
||||||
platformService.login(platform);
|
|
||||||
});
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
|
|
||||||
platformService.offline(platform, true);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}else {
|
|
||||||
platformService.login(platform);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 设置平台离线
|
|
||||||
platformService.offline(platform, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -0,0 +1,5 @@
|
|||||||
|
package com.genersoft.iot.vmp.gb28181.bean;
|
||||||
|
|
||||||
|
public interface PlatformKeepaliveCallback {
|
||||||
|
public void run(String platformServerGbId, int failCount);
|
||||||
|
}
|
||||||
@ -123,23 +123,27 @@ public interface DeviceMapper {
|
|||||||
@Update(value = {" <script>" +
|
@Update(value = {" <script>" +
|
||||||
"UPDATE wvp_device " +
|
"UPDATE wvp_device " +
|
||||||
"SET update_time=#{updateTime}" +
|
"SET update_time=#{updateTime}" +
|
||||||
"<if test=\"name != null\">, name=#{name}</if>" +
|
", name=#{name}" +
|
||||||
"<if test=\"manufacturer != null\">, manufacturer=#{manufacturer}</if>" +
|
", manufacturer=#{manufacturer}" +
|
||||||
"<if test=\"model != null\">, model=#{model}</if>" +
|
", model=#{model}" +
|
||||||
"<if test=\"firmware != null\">, firmware=#{firmware}</if>" +
|
", firmware=#{firmware}" +
|
||||||
"<if test=\"transport != null\">, transport=#{transport}</if>" +
|
", transport=#{transport}" +
|
||||||
"<if test=\"ip != null\">, ip=#{ip}</if>" +
|
", ip=#{ip}" +
|
||||||
"<if test=\"localIp != null\">, local_ip=#{localIp}</if>" +
|
", local_ip=#{localIp}" +
|
||||||
"<if test=\"port != null\">, port=#{port}</if>" +
|
", port=#{port}" +
|
||||||
"<if test=\"hostAddress != null\">, host_address=#{hostAddress}</if>" +
|
", host_address=#{hostAddress}" +
|
||||||
"<if test=\"onLine != null\">, on_line=#{onLine}</if>" +
|
", on_line=#{onLine}" +
|
||||||
"<if test=\"registerTime != null\">, register_time=#{registerTime}</if>" +
|
", register_time=#{registerTime}" +
|
||||||
"<if test=\"keepaliveTime != null\">, keepalive_time=#{keepaliveTime}</if>" +
|
", keepalive_time=#{keepaliveTime}" +
|
||||||
"<if test=\"heartBeatInterval != null\">, heart_beat_interval=#{heartBeatInterval}</if>" +
|
", heart_beat_interval=#{heartBeatInterval}" +
|
||||||
"<if test=\"positionCapability != null\">, position_capability=#{positionCapability}</if>" +
|
", position_capability=#{positionCapability}" +
|
||||||
"<if test=\"heartBeatCount != null\">, heart_beat_count=#{heartBeatCount}</if>" +
|
", heart_beat_count=#{heartBeatCount}" +
|
||||||
"<if test=\"expires != null\">, expires=#{expires}</if>" +
|
", subscribe_cycle_for_catalog=#{subscribeCycleForCatalog}" +
|
||||||
"<if test=\"serverId != null\">, server_id=#{serverId}</if>" +
|
", subscribe_cycle_for_mobile_position=#{subscribeCycleForMobilePosition}" +
|
||||||
|
", mobile_position_submission_interval=#{mobilePositionSubmissionInterval}" +
|
||||||
|
", subscribe_cycle_for_alarm=#{subscribeCycleForAlarm}" +
|
||||||
|
", expires=#{expires}" +
|
||||||
|
", server_id=#{serverId}" +
|
||||||
"WHERE device_id=#{deviceId}"+
|
"WHERE device_id=#{deviceId}"+
|
||||||
" </script>"})
|
" </script>"})
|
||||||
int update(Device device);
|
int update(Device device);
|
||||||
|
|||||||
@ -78,7 +78,7 @@ public interface PlatformMapper {
|
|||||||
List<Platform> queryList(@Param("query") String query);
|
List<Platform> queryList(@Param("query") String query);
|
||||||
|
|
||||||
@Select("SELECT * FROM wvp_platform WHERE server_id=#{serverId} and enable=#{enable} ")
|
@Select("SELECT * FROM wvp_platform WHERE server_id=#{serverId} and enable=#{enable} ")
|
||||||
List<Platform> queryEnableParentPlatformList(@Param("serverId") String serverId, @Param("enable") boolean enable);
|
List<Platform> queryEnableParentPlatformListByServerId(@Param("serverId") String serverId, @Param("enable") boolean enable);
|
||||||
|
|
||||||
@Select("SELECT * FROM wvp_platform WHERE enable=true and as_message_channel=true")
|
@Select("SELECT * FROM wvp_platform WHERE enable=true and as_message_channel=true")
|
||||||
List<Platform> queryEnablePlatformListWithAsMessageChannel();
|
List<Platform> queryEnablePlatformListWithAsMessageChannel();
|
||||||
@ -89,8 +89,8 @@ public interface PlatformMapper {
|
|||||||
@Select("SELECT * FROM wvp_platform WHERE id=#{id}")
|
@Select("SELECT * FROM wvp_platform WHERE id=#{id}")
|
||||||
Platform query(int id);
|
Platform query(int id);
|
||||||
|
|
||||||
@Update("UPDATE wvp_platform SET status=#{online} WHERE server_gb_id=#{platformGbID}" )
|
@Update("UPDATE wvp_platform SET status=#{online} WHERE id=#{id}" )
|
||||||
int updateStatus(@Param("platformGbID") String platformGbID, @Param("online") boolean online);
|
int updateStatus(@Param("id") int id, @Param("online") boolean online);
|
||||||
|
|
||||||
@Select("SELECT server_id FROM wvp_platform WHERE enable=true and server_id != #{serverId} group by server_id")
|
@Select("SELECT server_id FROM wvp_platform WHERE enable=true and server_id != #{serverId} group by server_id")
|
||||||
List<String> queryServerIdsWithEnableAndNotInServer(@Param("serverId") String serverId);
|
List<String> queryServerIdsWithEnableAndNotInServer(@Param("serverId") String serverId);
|
||||||
@ -100,4 +100,11 @@ public interface PlatformMapper {
|
|||||||
|
|
||||||
@Select("SELECT * FROM wvp_platform ")
|
@Select("SELECT * FROM wvp_platform ")
|
||||||
List<Platform> queryAll();
|
List<Platform> queryAll();
|
||||||
|
|
||||||
|
@Select("SELECT * FROM wvp_platform WHERE enable=true and server_id == #{serverId} group by server_id")
|
||||||
|
List<Platform> queryServerIdsWithEnableAndServer(@Param("serverId") String serverId);
|
||||||
|
|
||||||
|
@Update("UPDATE wvp_platform SET status=false" )
|
||||||
|
void offlineAll();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.event.sip;
|
package com.genersoft.iot.vmp.gb28181.event.sip;
|
||||||
|
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
@ -27,6 +28,8 @@ public class SipEvent implements Delayed {
|
|||||||
*/
|
*/
|
||||||
private long delay;
|
private long delay;
|
||||||
|
|
||||||
|
private SipTransactionInfo sipTransactionInfo;
|
||||||
|
|
||||||
public static SipEvent getInstance(String key, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, long delay) {
|
public static SipEvent getInstance(String key, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, long delay) {
|
||||||
SipEvent sipEvent = new SipEvent();
|
SipEvent sipEvent = new SipEvent();
|
||||||
sipEvent.setKey(key);
|
sipEvent.setKey(key);
|
||||||
|
|||||||
@ -55,12 +55,6 @@ public interface IPlatformService {
|
|||||||
*/
|
*/
|
||||||
void offline(Platform parentPlatform, boolean stopRegisterTask);
|
void offline(Platform parentPlatform, boolean stopRegisterTask);
|
||||||
|
|
||||||
/**
|
|
||||||
* 向上级平台发起注册
|
|
||||||
* @param parentPlatform
|
|
||||||
*/
|
|
||||||
void login(Platform parentPlatform);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 向上级平台发送位置订阅
|
* 向上级平台发送位置订阅
|
||||||
* @param platformId 平台
|
* @param platformId 平台
|
||||||
|
|||||||
@ -344,7 +344,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean addCatalogSubscribe(Device device, SipTransactionInfo transactionInfo) {
|
public boolean addCatalogSubscribe(@NotNull Device device, SipTransactionInfo transactionInfo) {
|
||||||
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
|
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -371,23 +371,13 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
});
|
});
|
||||||
} catch (InvalidArgumentException | SipException | ParseException e) {
|
} catch (InvalidArgumentException | SipException | ParseException e) {
|
||||||
log.error("[命令发送失败] 目录订阅: {}", e.getMessage());
|
log.error("[命令发送失败] 目录订阅: {}", e.getMessage());
|
||||||
|
return false;
|
||||||
} finally {
|
|
||||||
// 无论是否发起成功,都保存起来,如果失败后续任务会继续订阅
|
|
||||||
deviceMapper.updateSubscribeCatalog(device);
|
|
||||||
redisCatchStorage.updateDevice(device);
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) {
|
public boolean removeCatalogSubscribe(@NotNull Device device, CommonCallback<Boolean> callback) {
|
||||||
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(false);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
log.info("[移除目录订阅]: {}", device.getDeviceId());
|
log.info("[移除目录订阅]: {}", device.getDeviceId());
|
||||||
String key = SubscribeTaskForCatalog.getKey(device);
|
String key = SubscribeTaskForCatalog.getKey(device);
|
||||||
if (subscribeTaskRunner.containsKey(key)) {
|
if (subscribeTaskRunner.containsKey(key)) {
|
||||||
@ -396,6 +386,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId());
|
log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
device.setSubscribeCycleForCatalog(0);
|
||||||
sipCommander.catalogSubscribe(device, transactionInfo, eventResult -> {
|
sipCommander.catalogSubscribe(device, transactionInfo, eventResult -> {
|
||||||
// 成功
|
// 成功
|
||||||
log.info("[取消目录订阅]成功: {}", device.getDeviceId());
|
log.info("[取消目录订阅]成功: {}", device.getDeviceId());
|
||||||
@ -410,20 +401,13 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
// 失败
|
// 失败
|
||||||
log.warn("[取消目录订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage());
|
log.warn("[取消目录订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage());
|
||||||
}finally {
|
|
||||||
// 无论是否发起成功,都保存起来,如果失败,到期后将不再发起
|
|
||||||
deviceMapper.updateSubscribeCatalog(device);
|
|
||||||
redisCatchStorage.updateDevice(device);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean addMobilePositionSubscribe(Device device, SipTransactionInfo transactionInfo) {
|
public boolean addMobilePositionSubscribe(@NotNull Device device, SipTransactionInfo transactionInfo) {
|
||||||
if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
log.info("[添加移动位置订阅] 设备 {}", device.getDeviceId());
|
log.info("[添加移动位置订阅] 设备 {}", device.getDeviceId());
|
||||||
try {
|
try {
|
||||||
sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> {
|
sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> {
|
||||||
@ -447,24 +431,14 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
});
|
});
|
||||||
} catch (InvalidArgumentException | SipException | ParseException e) {
|
} catch (InvalidArgumentException | SipException | ParseException e) {
|
||||||
log.error("[命令发送失败] 移动位置订阅: {}", e.getMessage());
|
log.error("[命令发送失败] 移动位置订阅: {}", e.getMessage());
|
||||||
}finally {
|
return false;
|
||||||
// 无论是否发起成功,都保存起来,如果失败后续任务会继续订阅
|
|
||||||
deviceMapper.updateSubscribeMobilePosition(device);
|
|
||||||
redisCatchStorage.updateDevice(device);
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
|
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
|
||||||
if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(false);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
|
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
|
||||||
device.setSubscribeCycleForMobilePosition(0);
|
|
||||||
String key = SubscribeTaskForMobilPosition.getKey(device);
|
String key = SubscribeTaskForMobilPosition.getKey(device);
|
||||||
if (subscribeTaskRunner.containsKey(key)) {
|
if (subscribeTaskRunner.containsKey(key)) {
|
||||||
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
|
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
|
||||||
@ -472,6 +446,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId());
|
log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
device.setSubscribeCycleForMobilePosition(0);
|
||||||
sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> {
|
sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> {
|
||||||
// 成功
|
// 成功
|
||||||
log.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
|
log.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
|
||||||
@ -486,10 +461,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
// 失败
|
// 失败
|
||||||
log.warn("[取消移动位置订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage());
|
log.warn("[取消移动位置订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage());
|
||||||
}finally {
|
|
||||||
// 无论是否发起成功,都保存起来,如果失败,到期后将不再发起
|
|
||||||
deviceMapper.updateSubscribeMobilePosition(device);
|
|
||||||
redisCatchStorage.updateDevice(device);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
@ -645,10 +616,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
public boolean delete(String deviceId) {
|
public boolean delete(String deviceId) {
|
||||||
Device device = getDeviceByDeviceIdFromDb(deviceId);
|
Device device = getDeviceByDeviceIdFromDb(deviceId);
|
||||||
Assert.notNull(device, "未找到设备");
|
Assert.notNull(device, "未找到设备");
|
||||||
if (device.getSubscribeCycleForCatalog() > 0) {
|
if (subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
|
||||||
removeCatalogSubscribe(device, null);
|
removeCatalogSubscribe(device, null);
|
||||||
}
|
}
|
||||||
if (device.getSubscribeCycleForMobilePosition() > 0) {
|
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
||||||
removeMobilePositionSubscribe(device, null);
|
removeMobilePositionSubscribe(device, null);
|
||||||
}
|
}
|
||||||
// 停止状态检测
|
// 停止状态检测
|
||||||
@ -718,6 +689,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
// 订阅周期不同,则先取消
|
// 订阅周期不同,则先取消
|
||||||
removeCatalogSubscribe(device, result->{
|
removeCatalogSubscribe(device, result->{
|
||||||
device.setSubscribeCycleForCatalog(cycle);
|
device.setSubscribeCycleForCatalog(cycle);
|
||||||
|
updateDevice(device);
|
||||||
if (cycle > 0) {
|
if (cycle > 0) {
|
||||||
// 开启订阅
|
// 开启订阅
|
||||||
addCatalogSubscribe(device, null);
|
addCatalogSubscribe(device, null);
|
||||||
@ -726,6 +698,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}else {
|
}else {
|
||||||
// 开启订阅
|
// 开启订阅
|
||||||
device.setSubscribeCycleForCatalog(cycle);
|
device.setSubscribeCycleForCatalog(cycle);
|
||||||
|
updateDevice(device);
|
||||||
addCatalogSubscribe(device, null);
|
addCatalogSubscribe(device, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
|
|||||||
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
|
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
|
||||||
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
||||||
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
|
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.task.platformStatus.PlatformKeepaliveTask;
|
||||||
import com.genersoft.iot.vmp.gb28181.task.platformStatus.PlatformRegisterTask;
|
import com.genersoft.iot.vmp.gb28181.task.platformStatus.PlatformRegisterTask;
|
||||||
import com.genersoft.iot.vmp.gb28181.task.platformStatus.PlatformRegisterTaskInfo;
|
import com.genersoft.iot.vmp.gb28181.task.platformStatus.PlatformRegisterTaskInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.task.platformStatus.PlatformStatusTaskRunner;
|
import com.genersoft.iot.vmp.gb28181.task.platformStatus.PlatformStatusTaskRunner;
|
||||||
@ -30,7 +31,6 @@ import com.genersoft.iot.vmp.service.ISendRtpServerService;
|
|||||||
import com.genersoft.iot.vmp.service.bean.*;
|
import com.genersoft.iot.vmp.service.bean.*;
|
||||||
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
|
||||||
import com.github.pagehelper.PageHelper;
|
import com.github.pagehelper.PageHelper;
|
||||||
import com.github.pagehelper.PageInfo;
|
import com.github.pagehelper.PageInfo;
|
||||||
import gov.nist.javax.sip.message.SIPResponse;
|
import gov.nist.javax.sip.message.SIPResponse;
|
||||||
@ -72,7 +72,6 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IRedisCatchStorage redisCatchStorage;
|
private IRedisCatchStorage redisCatchStorage;
|
||||||
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private SSRCFactory ssrcFactory;
|
private SSRCFactory ssrcFactory;
|
||||||
|
|
||||||
@ -125,16 +124,49 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
for (PlatformRegisterTaskInfo taskInfo : registerTaskInfoList) {
|
for (PlatformRegisterTaskInfo taskInfo : registerTaskInfoList) {
|
||||||
log.info("[国标级联] 启动服务是发现平台注册仍在有效期,注销: {}", taskInfo.getPlatformServerId());
|
log.info("[国标级联] 启动服务是发现平台注册仍在有效期,注销: {}", taskInfo.getPlatformServerId());
|
||||||
Platform platform = queryPlatformByServerGBId(taskInfo.getPlatformServerId());
|
Platform platform = queryPlatformByServerGBId(taskInfo.getPlatformServerId());
|
||||||
commanderForPlatform.unregister(platform, taskInfo.getSipTransactionInfo(), null, eventResult -> {
|
if (platform == null) {
|
||||||
log.info("[国标级联] 注销成功, 平台:{}", taskInfo.getPlatformServerId());
|
statusTaskRunner.removeRegisterTask(taskInfo.getPlatformServerId());
|
||||||
});
|
continue;
|
||||||
|
}
|
||||||
|
sendUnRegister(platform, taskInfo.getSipTransactionInfo());
|
||||||
|
}
|
||||||
|
// 启动时所有平台默认离线
|
||||||
|
platformMapper.offlineAll();
|
||||||
|
}
|
||||||
|
@Scheduled(fixedDelay = 20, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
|
||||||
|
public void statusLostCheck(){
|
||||||
|
// 每隔20秒检测,是否存在启用但是未注册的平台,存在则发起注册
|
||||||
|
// 获取所有在线并且启用的平台
|
||||||
|
List<Platform> platformList = platformMapper.queryServerIdsWithEnableAndServer(userSetting.getServerId());
|
||||||
|
if (platformList.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Platform platform : platformList) {
|
||||||
|
sendRegister(platform, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void sendRegister(Platform platform, SipTransactionInfo sipTransactionInfo) {
|
||||||
|
try {
|
||||||
|
commanderForPlatform.register(platform, sipTransactionInfo, eventResult -> {
|
||||||
|
log.info("[国标级联] {}({}),添加向上级注册失败,请确定上级平台可用时重新保存", platform.getName(), platform.getServerGBId());
|
||||||
|
}, null);
|
||||||
|
} catch (InvalidArgumentException | ParseException | SipException e) {
|
||||||
|
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO 每隔20秒检测,是否存在启用但是未注册的平台,存在则发起注册
|
private void sendUnRegister(Platform platform, SipTransactionInfo sipTransactionInfo) {
|
||||||
// TODO 平台注册成功通知处理
|
statusTaskRunner.removeRegisterTask(platform.getServerGBId());
|
||||||
// TODO 平台注销成功通知处理
|
statusTaskRunner.removeKeepAliveTask(platform.getServerGBId());
|
||||||
|
try {
|
||||||
|
commanderForPlatform.unregister(platform, sipTransactionInfo, null, eventResult -> {
|
||||||
|
log.info("[国标级联] 注销成功, 平台:{}", platform.getServerGBId());
|
||||||
|
});
|
||||||
|
} catch (InvalidArgumentException | ParseException | SipException e) {
|
||||||
|
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 定时监听国标级联所进行的WVP服务是否正常, 如果异常则选择新的wvp执行
|
// 定时监听国标级联所进行的WVP服务是否正常, 如果异常则选择新的wvp执行
|
||||||
@Scheduled(fixedDelay = 2, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
|
@Scheduled(fixedDelay = 2, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
|
||||||
@ -166,20 +198,26 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
platform.setAddress(getIpWithSameNetwork(platform.getAddress()));
|
platform.setAddress(getIpWithSameNetwork(platform.getAddress()));
|
||||||
platform.setServerId(userSetting.getServerId());
|
platform.setServerId(userSetting.getServerId());
|
||||||
platformMapper.update(platform);
|
platformMapper.update(platform);
|
||||||
// 更新redis
|
// 检查就平台是否注册到期,没有则注销,由本平台重新注册
|
||||||
redisCatchStorage.delPlatformCatchInfo(platform.getServerGBId());
|
List<PlatformRegisterTaskInfo> taskInfoList = statusTaskRunner.getRegisterTransactionInfoByServerId(serverId);
|
||||||
PlatformCatch platformCatch = new PlatformCatch();
|
boolean needUnregister = false;
|
||||||
platformCatch.setPlatform(platform);
|
SipTransactionInfo sipTransactionInfo = null;
|
||||||
platformCatch.setId(platform.getServerGBId());
|
if (!taskInfoList.isEmpty()) {
|
||||||
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
|
for (PlatformRegisterTaskInfo taskInfo : taskInfoList) {
|
||||||
// 开始注册
|
if (taskInfo.getPlatformServerId().equals(platform.getServerGBId())
|
||||||
// 注册成功时由程序直接调用了online方法
|
&& taskInfo.getSipTransactionInfo() != null) {
|
||||||
try {
|
needUnregister = true;
|
||||||
commanderForPlatform.register(platform, eventResult -> {
|
sipTransactionInfo = taskInfo.getSipTransactionInfo();
|
||||||
log.info("[国标级联] {}({}),添加向上级注册失败,请确定上级平台可用时重新保存", platform.getName(), platform.getServerGBId());
|
break;
|
||||||
}, null);
|
}
|
||||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
}
|
||||||
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
|
}
|
||||||
|
if (needUnregister) {
|
||||||
|
sendUnRegister(platform, sipTransactionInfo);
|
||||||
|
}else {
|
||||||
|
// 开始注册
|
||||||
|
// 注册成功时由程序直接调用了online方法
|
||||||
|
sendRegister(platform, null);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -292,21 +330,11 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
}
|
}
|
||||||
platform.setServerId(userSetting.getServerId());
|
platform.setServerId(userSetting.getServerId());
|
||||||
int result = platformMapper.add(platform);
|
int result = platformMapper.add(platform);
|
||||||
// 添加缓存
|
|
||||||
PlatformCatch platformCatch = new PlatformCatch();
|
|
||||||
platformCatch.setPlatform(platform);
|
|
||||||
platformCatch.setId(platform.getServerGBId());
|
|
||||||
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
|
|
||||||
if (platform.isEnable()) {
|
if (platform.isEnable()) {
|
||||||
// 保存时启用就发送注册
|
// 保存时启用就发送注册
|
||||||
// 注册成功时由程序直接调用了online方法
|
// 注册成功时由程序直接调用了online方法
|
||||||
try {
|
sendRegister(platform, null);
|
||||||
commanderForPlatform.register(platform, eventResult -> {
|
|
||||||
log.info("[国标级联] {}({}),添加向上级注册失败,请确定上级平台可用时重新保存", platform.getName(), platform.getServerGBId());
|
|
||||||
}, null);
|
|
||||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
|
||||||
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return result > 0;
|
return result > 0;
|
||||||
}
|
}
|
||||||
@ -321,52 +349,15 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
if (!userSetting.getServerId().equals(platformInDb.getServerId())) {
|
if (!userSetting.getServerId().equals(platformInDb.getServerId())) {
|
||||||
return redisRpcService.updatePlatform(platformInDb.getServerId(), platform);
|
return redisRpcService.updatePlatform(platformInDb.getServerId(), platform);
|
||||||
}
|
}
|
||||||
|
|
||||||
PlatformCatch platformCatchOld = redisCatchStorage.queryPlatformCatchInfo(platformInDb.getServerGBId());
|
|
||||||
platform.setUpdateTime(DateUtil.getNow());
|
|
||||||
|
|
||||||
// 停止心跳定时
|
|
||||||
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + platformInDb.getServerGBId();
|
|
||||||
dynamicTask.stop(keepaliveTaskKey);
|
|
||||||
// 停止注册定时
|
|
||||||
final String registerTaskKey = REGISTER_KEY_PREFIX + platformInDb.getServerGBId();
|
|
||||||
dynamicTask.stop(registerTaskKey);
|
|
||||||
// 注销旧的
|
|
||||||
try {
|
|
||||||
if (platformInDb.isStatus() && platformCatchOld != null) {
|
|
||||||
log.info("保存平台{}时发现旧平台在线,发送注销命令", platformInDb.getServerGBId());
|
|
||||||
commanderForPlatform.unregister(platformInDb, platformCatchOld.getSipTransactionInfo(), null, eventResult -> {
|
|
||||||
log.info("[国标级联] 注销成功, 平台:{}", platformInDb.getServerGBId());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
|
||||||
log.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新数据库
|
// 更新数据库
|
||||||
if (platform.getCatalogGroup() == 0) {
|
if (platform.getCatalogGroup() == 0) {
|
||||||
platform.setCatalogGroup(1);
|
platform.setCatalogGroup(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
platformMapper.update(platform);
|
platformMapper.update(platform);
|
||||||
// 更新redis
|
if (statusTaskRunner.containsRegister(platformInDb.getServerGBId())) {
|
||||||
redisCatchStorage.delPlatformCatchInfo(platformInDb.getServerGBId());
|
SipTransactionInfo transactionInfo = statusTaskRunner.getRegisterTransactionInfo(platformInDb.getServerGBId());
|
||||||
PlatformCatch platformCatch = new PlatformCatch();
|
// 注销后出发平台离线, 如果是启用的平台,那么下次丢失检测会检测到并重新注册上线
|
||||||
platformCatch.setPlatform(platform);
|
sendUnRegister(platformInDb, transactionInfo);
|
||||||
platformCatch.setId(platform.getServerGBId());
|
|
||||||
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
|
|
||||||
// 注册
|
|
||||||
if (platform.isEnable()) {
|
|
||||||
// 保存时启用就发送注册
|
|
||||||
// 注册成功时由程序直接调用了online方法
|
|
||||||
try {
|
|
||||||
log.info("[国标级联] 平台注册 {}", platform.getDeviceGBId());
|
|
||||||
commanderForPlatform.register(platform, eventResult -> {
|
|
||||||
log.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", platform.getServerGBId());
|
|
||||||
}, null);
|
|
||||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
|
||||||
log.error("[命令发送失败] 国标级联: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
@ -375,79 +366,22 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
@Override
|
@Override
|
||||||
public void online(Platform platform, SipTransactionInfo sipTransactionInfo) {
|
public void online(Platform platform, SipTransactionInfo sipTransactionInfo) {
|
||||||
log.info("[国标级联]:{}, 平台上线", platform.getServerGBId());
|
log.info("[国标级联]:{}, 平台上线", platform.getServerGBId());
|
||||||
final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + platform.getServerGBId();
|
PlatformRegisterTask registerTask = new PlatformRegisterTask(platform.getServerId(), platform.getExpires() * 1000L - 500L,
|
||||||
dynamicTask.stop(registerFailAgainTaskKey);
|
sipTransactionInfo, (platformServerGbId) -> {
|
||||||
|
this.registerExpire(platformServerGbId, sipTransactionInfo);
|
||||||
|
});
|
||||||
|
statusTaskRunner.addRegisterTask(registerTask);
|
||||||
|
|
||||||
platformMapper.updateStatus(platform.getServerGBId(), true);
|
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L,
|
||||||
PlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId());
|
this::keepaliveExpire);
|
||||||
if (platformCatch == null) {
|
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||||
platformCatch = new PlatformCatch();
|
|
||||||
platformCatch.setPlatform(platform);
|
|
||||||
platformCatch.setId(platform.getServerGBId());
|
|
||||||
platform.setStatus(true);
|
|
||||||
platformCatch.setPlatform(platform);
|
|
||||||
}
|
|
||||||
|
|
||||||
platformCatch.getPlatform().setStatus(true);
|
platformMapper.updateStatus(platform.getId(), true);
|
||||||
platformCatch.setSipTransactionInfo(sipTransactionInfo);
|
|
||||||
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
|
|
||||||
|
|
||||||
final String registerTaskKey = REGISTER_KEY_PREFIX + platform.getServerGBId();
|
|
||||||
if (!dynamicTask.isAlive(registerTaskKey)) {
|
|
||||||
log.info("[国标级联]:{}, 添加定时注册任务", platform.getServerGBId());
|
|
||||||
// 添加注册任务
|
|
||||||
dynamicTask.startCron(registerTaskKey,
|
|
||||||
// 注册失败(注册成功时由程序直接调用了online方法)
|
|
||||||
()-> registerTask(platform, sipTransactionInfo),
|
|
||||||
platform.getExpires() * 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + platform.getServerGBId();
|
|
||||||
if (!dynamicTask.contains(keepaliveTaskKey)) {
|
|
||||||
log.info("[国标级联]:{}, 添加定时心跳任务", platform.getServerGBId());
|
|
||||||
// 添加心跳任务
|
|
||||||
dynamicTask.startCron(keepaliveTaskKey,
|
|
||||||
()-> {
|
|
||||||
try {
|
|
||||||
commanderForPlatform.keepalive(platform, eventResult -> {
|
|
||||||
// 心跳失败
|
|
||||||
if (eventResult.type != SipSubscribe.EventResultType.timeout) {
|
|
||||||
log.warn("[国标级联]发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg);
|
|
||||||
}
|
|
||||||
// 心跳失败
|
|
||||||
PlatformCatch platformCatchForNow = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId());
|
|
||||||
// 此时是第三次心跳超时, 平台离线
|
|
||||||
if (platformCatchForNow.getKeepAliveReply() == 2) {
|
|
||||||
// 设置平台离线,并重新注册
|
|
||||||
log.info("[国标级联] 三次心跳失败, 平台{}({})离线", platform.getName(), platform.getServerGBId());
|
|
||||||
offline(platform, false);
|
|
||||||
}else {
|
|
||||||
platformCatchForNow.setKeepAliveReply(platformCatchForNow.getKeepAliveReply() + 1);
|
|
||||||
redisCatchStorage.updatePlatformCatchInfo(platformCatchForNow);
|
|
||||||
}
|
|
||||||
|
|
||||||
}, eventResult -> {
|
|
||||||
// 心跳成功
|
|
||||||
// 清空之前的心跳超时计数
|
|
||||||
PlatformCatch platformCatchForNow = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId());
|
|
||||||
if (platformCatchForNow != null && platformCatchForNow.getKeepAliveReply() > 0) {
|
|
||||||
platformCatchForNow.setKeepAliveReply(0);
|
|
||||||
redisCatchStorage.updatePlatformCatchInfo(platformCatchForNow);
|
|
||||||
}
|
|
||||||
log.info("[国标级联] 发送心跳,平台{}({}), code: {}, msg: {}", platform.getName(), platform.getServerGBId(), eventResult.statusCode, eventResult.msg);
|
|
||||||
});
|
|
||||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
|
||||||
log.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
},
|
|
||||||
(platform.getKeepTimeout())*1000);
|
|
||||||
}
|
|
||||||
if (platform.getAutoPushChannel() != null && platform.getAutoPushChannel()) {
|
if (platform.getAutoPushChannel() != null && platform.getAutoPushChannel()) {
|
||||||
if (subscribeHolder.getCatalogSubscribe(platform.getServerGBId()) == null) {
|
if (subscribeHolder.getCatalogSubscribe(platform.getServerGBId()) == null) {
|
||||||
log.info("[国标级联]:{}, 添加自动通道推送模拟订阅信息", platform.getServerGBId());
|
log.info("[国标级联]:{}, 添加自动通道推送模拟订阅信息", platform.getServerGBId());
|
||||||
addSimulatedSubscribeInfo(platform);
|
addSimulatedSubscribeInfo(platform);
|
||||||
|
|
||||||
}
|
}
|
||||||
}else {
|
}else {
|
||||||
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
|
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
|
||||||
@ -457,6 +391,65 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 注册到期处理
|
||||||
|
*/
|
||||||
|
private void registerExpire(String platformServerId, SipTransactionInfo transactionInfo) {
|
||||||
|
log.info("[国标级联] 注册到期, 上级平台编号: {}", platformServerId);
|
||||||
|
Platform platform = queryPlatformByServerGBId(platformServerId);
|
||||||
|
if (platform == null || !platform.isEnable()) {
|
||||||
|
log.info("[国标级联] 注册到期, 上级平台编号: {}, 平台不存在或者未启用, 忽略", platformServerId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sendRegister(platform, transactionInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void keepaliveExpire(String platformServerId, int failCount) {
|
||||||
|
log.info("[国标级联] 心跳到期, 上级平台编号: {}", platformServerId);
|
||||||
|
Platform platform = queryPlatformByServerGBId(platformServerId);
|
||||||
|
if (platform == null || !platform.isEnable()) {
|
||||||
|
log.info("[国标级联] 心跳到期, 上级平台编号: {}, 平台不存在或者未启用, 忽略", platformServerId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
commanderForPlatform.keepalive(platform, eventResult -> {
|
||||||
|
// 心跳失败
|
||||||
|
if (eventResult.type != SipSubscribe.EventResultType.timeout) {
|
||||||
|
log.warn("[国标级联] 发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 心跳超时失败
|
||||||
|
if (failCount < 2) {
|
||||||
|
log.info("[国标级联] 心跳发送超时, 平台服务编号: {}", platformServerId);
|
||||||
|
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L,
|
||||||
|
this::keepaliveExpire);
|
||||||
|
keepaliveTask.setFailCount(failCount + 1);
|
||||||
|
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||||
|
}else {
|
||||||
|
// 心跳超时三次, 不再发送心跳, 平台离线
|
||||||
|
log.info("[国标级联] 心跳发送超时三次,平台离线, 平台服务编号: {}", platformServerId);
|
||||||
|
offline(platform, false);
|
||||||
|
}
|
||||||
|
}, eventResult -> {
|
||||||
|
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L,
|
||||||
|
this::keepaliveExpire);
|
||||||
|
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||||
|
});
|
||||||
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
|
log.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
|
||||||
|
if (failCount < 2) {
|
||||||
|
PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L,
|
||||||
|
this::keepaliveExpire);
|
||||||
|
keepaliveTask.setFailCount(failCount + 1);
|
||||||
|
statusTaskRunner.addKeepAliveTask(keepaliveTask);
|
||||||
|
}else {
|
||||||
|
// 心跳超时三次, 不再发送心跳, 平台离线
|
||||||
|
log.info("[国标级联] 心跳发送失败三次,平台离线, 平台服务编号: {}", platformServerId);
|
||||||
|
offline(platform, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addSimulatedSubscribeInfo(Platform platform) {
|
public void addSimulatedSubscribeInfo(Platform platform) {
|
||||||
// 自动添加一条模拟的订阅信息
|
// 自动添加一条模拟的订阅信息
|
||||||
@ -464,82 +457,20 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
SubscribeInfo.buildSimulated(platform.getServerGBId(), platform.getServerIp()));
|
SubscribeInfo.buildSimulated(platform.getServerGBId(), platform.getServerIp()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerTask(Platform platform, SipTransactionInfo sipTransactionInfo){
|
|
||||||
try {
|
|
||||||
// 不在同一个会话中续订则每次全新注册
|
|
||||||
if (!userSetting.isRegisterKeepIntDialog()) {
|
|
||||||
sipTransactionInfo = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sipTransactionInfo == null) {
|
|
||||||
log.info("[国标级联] 平台:{}注册即将到期,开始重新注册", platform.getServerGBId());
|
|
||||||
}else {
|
|
||||||
log.info("[国标级联] 平台:{}注册即将到期,开始续订", platform.getServerGBId());
|
|
||||||
}
|
|
||||||
|
|
||||||
commanderForPlatform.register(platform, sipTransactionInfo, eventResult -> {
|
|
||||||
log.info("[国标级联] 平台:{}注册失败,{}:{}", platform.getServerGBId(),
|
|
||||||
eventResult.statusCode, eventResult.msg);
|
|
||||||
if (platform.isStatus()) {
|
|
||||||
offline(platform, false);
|
|
||||||
}
|
|
||||||
}, null);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void offline(Platform platform, boolean stopRegister) {
|
public void offline(Platform platform, boolean stopRegister) {
|
||||||
log.info("[平台离线]:{}({})", platform.getName(), platform.getServerGBId());
|
log.info("[平台离线]:{}({})", platform.getName(), platform.getServerGBId());
|
||||||
PlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId());
|
statusTaskRunner.removeRegisterTask(platform.getServerGBId());
|
||||||
platformCatch.setKeepAliveReply(0);
|
statusTaskRunner.removeKeepAliveTask(platform.getServerGBId());
|
||||||
platformCatch.setRegisterAliveReply(0);
|
|
||||||
Platform catchPlatform = platformCatch.getPlatform();
|
subscribeHolder.removeCatalogSubscribe(platform.getServerGBId());
|
||||||
catchPlatform.setStatus(false);
|
subscribeHolder.removeMobilePositionSubscribe(platform.getServerGBId());
|
||||||
platformCatch.setPlatform(catchPlatform);
|
|
||||||
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
|
platformMapper.updateStatus(platform.getId(), false);
|
||||||
platformMapper.updateStatus(platform.getServerGBId(), false);
|
|
||||||
|
|
||||||
// 停止所有推流
|
// 停止所有推流
|
||||||
log.info("[平台离线] {}({}), 停止所有推流", platform.getName(), platform.getServerGBId());
|
log.info("[平台离线] {}({}), 停止所有推流", platform.getName(), platform.getServerGBId());
|
||||||
stopAllPush(platform.getServerGBId());
|
stopAllPush(platform.getServerGBId());
|
||||||
|
|
||||||
// 清除注册定时
|
|
||||||
log.info("[平台离线] {}({}), 停止定时注册任务", platform.getName(), platform.getServerGBId());
|
|
||||||
final String registerTaskKey = REGISTER_KEY_PREFIX + platform.getServerGBId();
|
|
||||||
if (dynamicTask.contains(registerTaskKey)) {
|
|
||||||
dynamicTask.stop(registerTaskKey);
|
|
||||||
}
|
|
||||||
// 清除心跳定时
|
|
||||||
log.info("[平台离线] {}({}), 停止定时发送心跳任务", platform.getName(), platform.getServerGBId());
|
|
||||||
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + platform.getServerGBId();
|
|
||||||
if (dynamicTask.contains(keepaliveTaskKey)) {
|
|
||||||
// 清除心跳任务
|
|
||||||
dynamicTask.stop(keepaliveTaskKey);
|
|
||||||
}
|
|
||||||
// 停止订阅回复
|
|
||||||
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
|
|
||||||
if (catalogSubscribe != null) {
|
|
||||||
if (catalogSubscribe.getExpires() > 0) {
|
|
||||||
log.info("[平台离线] {}({}), 停止目录订阅回复", platform.getName(), platform.getServerGBId());
|
|
||||||
subscribeHolder.removeCatalogSubscribe(platform.getServerGBId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("[平台离线] {}({}), 停止移动位置订阅回复", platform.getName(), platform.getServerGBId());
|
|
||||||
subscribeHolder.removeMobilePositionSubscribe(platform.getServerGBId());
|
|
||||||
// 发起定时自动重新注册
|
|
||||||
if (!stopRegister) {
|
|
||||||
// 设置为60秒自动尝试重新注册
|
|
||||||
final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + platform.getServerGBId();
|
|
||||||
Platform platformInDb = platformMapper.query(platform.getId());
|
|
||||||
if (platformInDb.isEnable()) {
|
|
||||||
dynamicTask.startCron(registerFailAgainTaskKey,
|
|
||||||
()-> registerTask(platformInDb, null),
|
|
||||||
userSetting.getRegisterAgainAfterTime() * 1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void stopAllPush(String platformId) {
|
private void stopAllPush(String platformId) {
|
||||||
@ -554,23 +485,6 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void login(Platform platform) {
|
|
||||||
final String registerTaskKey = REGISTER_KEY_PREFIX + platform.getServerGBId();
|
|
||||||
try {
|
|
||||||
commanderForPlatform.register(platform, eventResult1 -> {
|
|
||||||
log.info("[国标级联] {},开始定时发起注册,间隔为1分钟", platform.getServerGBId());
|
|
||||||
// 添加注册任务
|
|
||||||
dynamicTask.startCron(registerTaskKey,
|
|
||||||
// 注册失败(注册成功时由程序直接调用了online方法)
|
|
||||||
()-> log.info("[国标级联] {}({}),平台离线后持续发起注册,失败", platform.getName(), platform.getServerGBId()),
|
|
||||||
60*1000);
|
|
||||||
}, null);
|
|
||||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
|
||||||
log.error("[命令发送失败] 国标级联注册: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendNotifyMobilePosition(String platformId) {
|
public void sendNotifyMobilePosition(String platformId) {
|
||||||
Platform platform = platformMapper.getParentPlatByServerGBId(platformId);
|
Platform platform = platformMapper.getParentPlatByServerGBId(platformId);
|
||||||
@ -918,7 +832,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Platform> queryEnablePlatformList(String serverId) {
|
public List<Platform> queryEnablePlatformList(String serverId) {
|
||||||
return platformMapper.queryEnableParentPlatformList(serverId,true);
|
return platformMapper.queryEnableParentPlatformListByServerId(serverId,true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -926,56 +840,19 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
public void delete(Integer platformId, CommonCallback<Object> callback) {
|
public void delete(Integer platformId, CommonCallback<Object> callback) {
|
||||||
Platform platform = platformMapper.query(platformId);
|
Platform platform = platformMapper.query(platformId);
|
||||||
Assert.notNull(platform, "平台不存在");
|
Assert.notNull(platform, "平台不存在");
|
||||||
// 发送离线消息,无论是否成功都删除缓存
|
if (statusTaskRunner.containsRegister(platform.getServerGBId())) {
|
||||||
PlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId());
|
|
||||||
if (platformCatch != null) {
|
|
||||||
String key = UUID.randomUUID().toString();
|
|
||||||
dynamicTask.startDelay(key, ()->{
|
|
||||||
deletePlatformInfo(platform);
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(null);
|
|
||||||
}
|
|
||||||
}, 2000);
|
|
||||||
try {
|
try {
|
||||||
commanderForPlatform.unregister(platform, platformCatch.getSipTransactionInfo(), (event -> {
|
SipTransactionInfo transactionInfo = statusTaskRunner.getRegisterTransactionInfo(platform.getServerGBId());
|
||||||
dynamicTask.stop(key);
|
sendUnRegister(platform, transactionInfo);
|
||||||
// 移除平台相关的信息
|
}catch (Exception ignored) {}
|
||||||
deletePlatformInfo(platform);
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(null);
|
|
||||||
}
|
|
||||||
}), (event -> {
|
|
||||||
dynamicTask.stop(key);
|
|
||||||
// 移除平台相关的信息
|
|
||||||
deletePlatformInfo(platform);
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(null);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
|
||||||
log.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
}else {
|
|
||||||
deletePlatformInfo(platform);
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(null);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Transactional
|
|
||||||
public void deletePlatformInfo(Platform platform) {
|
|
||||||
// 删除关联的通道
|
|
||||||
platformChannelMapper.removeChannelsByPlatformId(platform.getId());
|
|
||||||
// 删除关联的分组
|
|
||||||
platformChannelMapper.removePlatformGroupsByPlatformId(platform.getId());
|
|
||||||
// 删除关联的行政区划
|
|
||||||
platformChannelMapper.removePlatformRegionByPlatformId(platform.getId());
|
|
||||||
// 删除redis缓存
|
|
||||||
redisCatchStorage.delPlatformCatchInfo(platform.getServerGBId());
|
|
||||||
// 删除平台信息
|
|
||||||
platformMapper.delete(platform.getId());
|
platformMapper.delete(platform.getId());
|
||||||
|
|
||||||
|
statusTaskRunner.removeRegisterTask(platform.getServerGBId());
|
||||||
|
statusTaskRunner.removeKeepAliveTask(platform.getServerGBId());
|
||||||
|
|
||||||
|
subscribeHolder.removeCatalogSubscribe(platform.getServerGBId());
|
||||||
|
subscribeHolder.removeMobilePositionSubscribe(platform.getServerGBId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.task.platformStatus;
|
package com.genersoft.iot.vmp.gb28181.task.platformStatus;
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.common.CommonCallback;
|
import com.genersoft.iot.vmp.gb28181.bean.PlatformKeepaliveCallback;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -29,11 +29,18 @@ public class PlatformKeepaliveTask implements Delayed {
|
|||||||
* 到期回调
|
* 到期回调
|
||||||
*/
|
*/
|
||||||
@Getter
|
@Getter
|
||||||
private CommonCallback<String> callback;
|
private PlatformKeepaliveCallback callback;
|
||||||
|
|
||||||
public PlatformKeepaliveTask(String platformServerId, long delayTime, CommonCallback<String> callback) {
|
/**
|
||||||
|
* 心跳发送失败次数
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
private int failCount;
|
||||||
|
|
||||||
|
public PlatformKeepaliveTask(String platformServerId, long delayTime, PlatformKeepaliveCallback callback) {
|
||||||
this.platformServerId = platformServerId;
|
this.platformServerId = platformServerId;
|
||||||
this.delayTime = delayTime;
|
this.delayTime = System.currentTimeMillis() + delayTime;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,7 +49,7 @@ public class PlatformKeepaliveTask implements Delayed {
|
|||||||
log.info("[平台心跳到期] 未找到到期处理回调, 平台上级编号: {}", platformServerId);
|
log.info("[平台心跳到期] 未找到到期处理回调, 平台上级编号: {}", platformServerId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
getCallback().run(platformServerId);
|
getCallback().run(platformServerId, failCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -38,7 +38,7 @@ public class PlatformRegisterTask implements Delayed {
|
|||||||
|
|
||||||
public PlatformRegisterTask(String platformServerId, long delayTime, SipTransactionInfo sipTransactionInfo, CommonCallback<String> callback) {
|
public PlatformRegisterTask(String platformServerId, long delayTime, SipTransactionInfo sipTransactionInfo, CommonCallback<String> callback) {
|
||||||
this.platformServerId = platformServerId;
|
this.platformServerId = platformServerId;
|
||||||
this.delayTime = delayTime;
|
this.delayTime = System.currentTimeMillis() + delayTime;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
this.sipTransactionInfo = sipTransactionInfo;
|
this.sipTransactionInfo = sipTransactionInfo;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,8 +2,6 @@ package com.genersoft.iot.vmp.gb28181.task.platformStatus;
|
|||||||
|
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
|
|
||||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -88,12 +86,11 @@ public class PlatformStatusTaskRunner {
|
|||||||
|
|
||||||
public boolean removeRegisterTask(String platformServerId) {
|
public boolean removeRegisterTask(String platformServerId) {
|
||||||
PlatformRegisterTask task = registerSubscribes.get(platformServerId);
|
PlatformRegisterTask task = registerSubscribes.get(platformServerId);
|
||||||
if (task == null) {
|
if (task != null) {
|
||||||
return false;
|
registerSubscribes.remove(platformServerId);
|
||||||
}
|
}
|
||||||
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), platformServerId);
|
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), platformServerId);
|
||||||
redisTemplate.delete(redisKey);
|
redisTemplate.delete(redisKey);
|
||||||
registerSubscribes.remove(platformServerId);
|
|
||||||
if (registerDelayQueue.contains(task)) {
|
if (registerDelayQueue.contains(task)) {
|
||||||
boolean remove = registerDelayQueue.remove(task);
|
boolean remove = registerDelayQueue.remove(task);
|
||||||
if (!remove) {
|
if (!remove) {
|
||||||
@ -136,24 +133,7 @@ public class PlatformStatusTaskRunner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public List<PlatformRegisterTaskInfo> getAllRegisterTaskInfo(){
|
public List<PlatformRegisterTaskInfo> getAllRegisterTaskInfo(){
|
||||||
String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId());
|
return getRegisterTransactionInfoByServerId(userSetting.getServerId());
|
||||||
List<Object> values = RedisUtil.scan(redisTemplate, scanKey);
|
|
||||||
if (values.isEmpty()) {
|
|
||||||
return new ArrayList<>();
|
|
||||||
}
|
|
||||||
List<PlatformRegisterTaskInfo> result = new ArrayList<>();
|
|
||||||
for (Object value : values) {
|
|
||||||
String redisKey = (String)value;
|
|
||||||
PlatformRegisterTaskInfo taskInfo = (PlatformRegisterTaskInfo)redisTemplate.opsForValue().get(redisKey);
|
|
||||||
if (taskInfo == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Long expire = redisTemplate.getExpire(redisKey);
|
|
||||||
taskInfo.setExpireTime(expire);
|
|
||||||
result.add(taskInfo);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addKeepAliveTask(PlatformKeepaliveTask task) {
|
public void addKeepAliveTask(PlatformKeepaliveTask task) {
|
||||||
@ -200,4 +180,24 @@ public class PlatformStatusTaskRunner {
|
|||||||
public boolean containsKeepAlive(String platformServerId) {
|
public boolean containsKeepAlive(String platformServerId) {
|
||||||
return keepaliveSubscribes.containsKey(platformServerId);
|
return keepaliveSubscribes.containsKey(platformServerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<PlatformRegisterTaskInfo> getRegisterTransactionInfoByServerId(String serverId) {
|
||||||
|
String scanKey = String.format("%s_%s_*", prefix, serverId);
|
||||||
|
List<Object> values = RedisUtil.scan(redisTemplate, scanKey);
|
||||||
|
if (values.isEmpty()) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
List<PlatformRegisterTaskInfo> result = new ArrayList<>();
|
||||||
|
for (Object value : values) {
|
||||||
|
String redisKey = (String)value;
|
||||||
|
PlatformRegisterTaskInfo taskInfo = (PlatformRegisterTaskInfo)redisTemplate.opsForValue().get(redisKey);
|
||||||
|
if (taskInfo == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Long expire = redisTemplate.getExpire(redisKey);
|
||||||
|
taskInfo.setExpireTime(expire);
|
||||||
|
result.add(taskInfo);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit;
|
|||||||
|
|
||||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||||
import com.genersoft.iot.vmp.gb28181.SipLayer;
|
import com.genersoft.iot.vmp.gb28181.SipLayer;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
|
import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
|
||||||
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
|
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
|
||||||
@ -13,10 +14,7 @@ import org.springframework.stereotype.Component;
|
|||||||
import org.springframework.util.ObjectUtils;
|
import org.springframework.util.ObjectUtils;
|
||||||
|
|
||||||
import javax.sip.SipException;
|
import javax.sip.SipException;
|
||||||
import javax.sip.header.CSeqHeader;
|
import javax.sip.header.*;
|
||||||
import javax.sip.header.CallIdHeader;
|
|
||||||
import javax.sip.header.UserAgentHeader;
|
|
||||||
import javax.sip.header.ViaHeader;
|
|
||||||
import javax.sip.message.Message;
|
import javax.sip.message.Message;
|
||||||
import javax.sip.message.Request;
|
import javax.sip.message.Request;
|
||||||
import javax.sip.message.Response;
|
import javax.sip.message.Response;
|
||||||
@ -73,6 +71,7 @@ public class SIPSender {
|
|||||||
if (okEvent != null || errorEvent != null) {
|
if (okEvent != null || errorEvent != null) {
|
||||||
CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME);
|
CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME);
|
||||||
CSeqHeader cSeqHeader = (CSeqHeader) message.getHeader(CSeqHeader.NAME);
|
CSeqHeader cSeqHeader = (CSeqHeader) message.getHeader(CSeqHeader.NAME);
|
||||||
|
FromHeader fromHeader = (FromHeader) message.getHeader(FromHeader.NAME);
|
||||||
String key = callIdHeader.getCallId() + cSeqHeader.getSeqNumber();
|
String key = callIdHeader.getCallId() + cSeqHeader.getSeqNumber();
|
||||||
SipEvent sipEvent = SipEvent.getInstance(key, eventResult -> {
|
SipEvent sipEvent = SipEvent.getInstance(key, eventResult -> {
|
||||||
sipSubscribe.removeSubscribe(key);
|
sipSubscribe.removeSubscribe(key);
|
||||||
@ -85,6 +84,18 @@ public class SIPSender {
|
|||||||
errorEvent.response(eventResult);
|
errorEvent.response(eventResult);
|
||||||
}
|
}
|
||||||
}), timeout == null ? sipConfig.getTimeout() : timeout);
|
}), timeout == null ? sipConfig.getTimeout() : timeout);
|
||||||
|
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo();
|
||||||
|
sipTransactionInfo.setFromTag(fromHeader.getTag());
|
||||||
|
sipTransactionInfo.setFromTag(fromHeader.getTag());
|
||||||
|
|
||||||
|
|
||||||
|
if (message instanceof Response) {
|
||||||
|
ToHeader toHeader = (ToHeader) message.getHeader(ToHeader.NAME);
|
||||||
|
sipTransactionInfo.setToTag(toHeader.getTag());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
sipEvent.setSipTransactionInfo(sipTransactionInfo);
|
||||||
sipSubscribe.addSubscribe(key, sipEvent);
|
sipSubscribe.addSubscribe(key, sipEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -88,7 +88,6 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
|
|||||||
log.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage());
|
log.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}else if (response.getStatusCode() == Response.OK){
|
}else if (response.getStatusCode() == Response.OK){
|
||||||
|
|
||||||
if (platformRegisterInfo.isRegister()) {
|
if (platformRegisterInfo.isRegister()) {
|
||||||
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
|
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
|
||||||
platformService.online(parentPlatform, sipTransactionInfo);
|
platformService.online(parentPlatform, sipTransactionInfo);
|
||||||
|
|||||||
@ -23,8 +23,6 @@ public interface IRedisCatchStorage {
|
|||||||
*/
|
*/
|
||||||
Long getCSEQ();
|
Long getCSEQ();
|
||||||
|
|
||||||
void updatePlatformCatchInfo(PlatformCatch parentPlatformCatch);
|
|
||||||
|
|
||||||
PlatformCatch queryPlatformCatchInfo(String platformGbId);
|
PlatformCatch queryPlatformCatchInfo(String platformGbId);
|
||||||
|
|
||||||
void delPlatformCatchInfo(String platformGbId);
|
void delPlatformCatchInfo(String platformGbId);
|
||||||
|
|||||||
@ -73,12 +73,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
|
|||||||
redisTemplate.opsForValue().set(key, 1);
|
redisTemplate.opsForValue().set(key, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void updatePlatformCatchInfo(PlatformCatch parentPlatformCatch) {
|
|
||||||
String key = VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + parentPlatformCatch.getId();
|
|
||||||
redisTemplate.opsForValue().set(key, parentPlatformCatch);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PlatformCatch queryPlatformCatchInfo(String platformGbId) {
|
public PlatformCatch queryPlatformCatchInfo(String platformGbId) {
|
||||||
return (PlatformCatch)redisTemplate.opsForValue().get(VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + platformGbId);
|
return (PlatformCatch)redisTemplate.opsForValue().get(VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + platformGbId);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user