重构设备订阅保活和更新机制

This commit is contained in:
lin 2025-05-21 14:57:10 +08:00
parent a1ba811962
commit dd17462b68
28 changed files with 718 additions and 573 deletions

View File

@ -0,0 +1,7 @@
package com.genersoft.iot.vmp.common;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
public interface SubscribeCallback{
public void run(String deviceId, SipTransactionInfo transactionInfo);
}

View File

@ -1,18 +1,23 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import lombok.Data;
/** /**
* 通过redis分发报警消息 * 通过redis分发报警消息
*/ */
@Data
public class AlarmChannelMessage { public class AlarmChannelMessage {
/** /**
* 国标编号 * 通道国标编号
*/ */
private String gbId; private String gbId;
/** /**
* 报警编号 * 报警编号
*/ */
private int alarmSn; private int alarmSn;
/** /**
* 告警类型 * 告警类型
*/ */
@ -22,36 +27,4 @@ public class AlarmChannelMessage {
* 报警描述 * 报警描述
*/ */
private String alarmDescription; private String alarmDescription;
public String getGbId() {
return gbId;
}
public void setGbId(String gbId) {
this.gbId = gbId;
}
public int getAlarmSn() {
return alarmSn;
}
public void setAlarmSn(int alarmSn) {
this.alarmSn = alarmSn;
}
public int getAlarmType() {
return alarmType;
}
public void setAlarmType(int alarmType) {
this.alarmType = alarmType;
}
public String getAlarmDescription() {
return alarmDescription;
}
public void setAlarmDescription(String alarmDescription) {
this.alarmDescription = alarmDescription;
}
} }

View File

@ -1,7 +1,9 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import lombok.Data;
@Data
public class SipTransactionInfo { public class SipTransactionInfo {
private String callId; private String callId;
@ -31,43 +33,4 @@ public class SipTransactionInfo {
public SipTransactionInfo() { public SipTransactionInfo() {
} }
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
public String getFromTag() {
return fromTag;
}
public void setFromTag(String fromTag) {
this.fromTag = fromTag;
}
public String getToTag() {
return toTag;
}
public void setToTag(String toTag) {
this.toTag = toTag;
}
public String getViaBranch() {
return viaBranch;
}
public void setViaBranch(String viaBranch) {
this.viaBranch = viaBranch;
}
public boolean isAsSender() {
return asSender;
}
public void setAsSender(boolean asSender) {
this.asSender = asSender;
}
} }

View File

@ -1,19 +1,20 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* @author lin * @author lin
*/ */
@Slf4j
@Component @Component
public class SubscribeHolder { public class SubscribeHolder {
@ -23,44 +24,40 @@ public class SubscribeHolder {
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
private final String taskOverduePrefix = "subscribe_overdue_"; @Autowired
private RedisTemplate<Object, Object> redisTemplate;
private static ConcurrentHashMap<String, SubscribeInfo> catalogMap = new ConcurrentHashMap<>(); private final String prefix = "VMP_SUBSCRIBE_OVERDUE";
private static ConcurrentHashMap<String, SubscribeInfo> mobilePositionMap = new ConcurrentHashMap<>();
public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
catalogMap.put(platformId, subscribeInfo); log.info("[国标级联] 添加目录订阅,平台: {} 有效期: {}", platformId, subscribeInfo.getExpires());
if (subscribeInfo.getExpires() > 0) { if (subscribeInfo.getExpires() < 0) {
// 添加订阅到期 return;
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
subscribeInfo.getExpires() * 1000);
} }
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
} }
public SubscribeInfo getCatalogSubscribe(String platformId) { public SubscribeInfo getCatalogSubscribe(String platformId) {
return catalogMap.get(platformId); String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
return (SubscribeInfo)redisTemplate.opsForValue().get(key);
} }
public void removeCatalogSubscribe(String platformId) { public void removeCatalogSubscribe(String platformId) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
catalogMap.remove(platformId); redisTemplate.delete(key);
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
Runnable runnable = dynamicTask.get(taskOverdueKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop(null);
}
// 添加任务处理订阅过期
dynamicTask.stop(taskOverdueKey);
} }
public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) { public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) {
mobilePositionMap.put(platformId, subscribeInfo); log.info("[国标级联] 添加移动位置订阅,平台: {} 有效期: {}", platformId, subscribeInfo.getExpires());
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId; if (subscribeInfo.getExpires() < 0) {
// 添加任务处理GPS定时推送 return;
}
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
int cycleForCatalog; int cycleForCatalog;
if (subscribeInfo.getGpsInterval() <= 0) { if (subscribeInfo.getGpsInterval() <= 0) {
@ -68,59 +65,55 @@ public class SubscribeHolder {
}else { }else {
cycleForCatalog = subscribeInfo.getGpsInterval(); cycleForCatalog = subscribeInfo.getGpsInterval();
} }
dynamicTask.startCron(key, gpsTask, dynamicTask.startCron(
key,
() -> {
SubscribeInfo subscribe = getMobilePositionSubscribe(platformId);
if (subscribe != null) {
gpsTask.run();
}else {
dynamicTask.stop(key);
}
},
cycleForCatalog * 1000); cycleForCatalog * 1000);
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
if (subscribeInfo.getExpires() > 0) {
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
removeMobilePositionSubscribe(subscribeInfo.getId());
},
subscribeInfo.getExpires() * 1000);
}
} }
public SubscribeInfo getMobilePositionSubscribe(String platformId) { public SubscribeInfo getMobilePositionSubscribe(String platformId) {
return mobilePositionMap.get(platformId); String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
return (SubscribeInfo)redisTemplate.opsForValue().get(key);
} }
public void removeMobilePositionSubscribe(String platformId) { public void removeMobilePositionSubscribe(String platformId) {
mobilePositionMap.remove(platformId); String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId; redisTemplate.delete(key);
// 结束任务处理GPS定时推送
dynamicTask.stop(key);
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
Runnable runnable = dynamicTask.get(taskOverdueKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop(null);
}
// 添加任务处理订阅过期
dynamicTask.stop(taskOverdueKey);
} }
public List<String> getAllCatalogSubscribePlatform() { public List<String> getAllCatalogSubscribePlatform(List<Platform> platformList) {
List<String> platforms = new ArrayList<>(); if (platformList == null || platformList.isEmpty()) {
if(catalogMap.size() > 0) { return new ArrayList<>();
for (String key : catalogMap.keySet()) { }
platforms.add(catalogMap.get(key).getId()); List<String> result = new ArrayList<>();
for (Platform platform : platformList) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerId());
} }
} }
return platforms; return result;
} }
public List<String> getAllMobilePositionSubscribePlatform() { public List<String> getAllMobilePositionSubscribePlatform(List<Platform> platformList) {
List<String> platforms = new ArrayList<>(); if (platformList == null || platformList.isEmpty()) {
if(!mobilePositionMap.isEmpty()) { return new ArrayList<>();
for (String key : mobilePositionMap.keySet()) { }
platforms.add(mobilePositionMap.get(key).getId()); List<String> result = new ArrayList<>();
for (Platform platform : platformList) {
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerId());
} }
} }
return platforms; return result;
}
public void removeAllSubscribe(String platformId) {
removeMobilePositionSubscribe(platformId);
removeCatalogSubscribe(platformId);
} }
} }

View File

@ -11,9 +11,6 @@ import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
@ -40,9 +37,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@Tag(name = "国标设备查询", description = "国标设备查询") @Tag(name = "国标设备查询", description = "国标设备查询")
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@ -79,7 +73,7 @@ public class DeviceQuery {
@Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "deviceId", description = "设备国标编号", required = true)
@GetMapping("/devices/{deviceId}") @GetMapping("/devices/{deviceId}")
public Device devices(@PathVariable String deviceId){ public Device devices(@PathVariable String deviceId){
return deviceService.getDeviceByDeviceId(deviceId); return deviceService.getDeviceByDeviceId(deviceId);
} }
@ -144,28 +138,10 @@ public class DeviceQuery {
} }
// 清除redis记录 // 清除redis记录
boolean isSuccess = deviceService.delete(deviceId); deviceService.delete(deviceId);
if (isSuccess) { JSONObject json = new JSONObject();
inviteStreamService.clearInviteInfo(deviceId); json.put("deviceId", deviceId);
// 停止此设备的订阅更新 return json.toString();
Set<String> allKeys = dynamicTask.getAllKeys();
for (String key : allKeys) {
if (key.startsWith(deviceId)) {
Runnable runnable = dynamicTask.get(key);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
subscribeTask.stop(null);
}
dynamicTask.stop(key);
}
}
JSONObject json = new JSONObject();
json.put("deviceId", deviceId);
return json.toString();
} else {
log.warn("设备信息删除API调用失败");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备信息删除API调用失败");
}
} }
@Operation(summary = "分页查询子目录通道", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Operation(summary = "分页查询子目录通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@ -355,28 +331,6 @@ public class DeviceQuery {
return wvpResult; return wvpResult;
} }
@GetMapping("/{deviceId}/subscribe_info")
@Operation(summary = "获取设备的订阅状态", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
public WVPResult<Map<String, Integer>> getSubscribeInfo(@PathVariable String deviceId) {
Set<String> allKeys = dynamicTask.getAllKeys();
Map<String, Integer> dialogStateMap = new HashMap<>();
for (String key : allKeys) {
if (key.startsWith(deviceId)) {
ISubscribeTask subscribeTask = (ISubscribeTask)dynamicTask.get(key);
if (subscribeTask instanceof CatalogSubscribeTask) {
dialogStateMap.put("catalog", 1);
}else if (subscribeTask instanceof MobilePositionSubscribeTask) {
dialogStateMap.put("mobilePosition", 1);
}
}
}
WVPResult<Map<String, Integer>> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
wvpResult.setData(dialogStateMap);
return wvpResult;
}
@GetMapping("/snap/{deviceId}/{channelId}") @GetMapping("/snap/{deviceId}/{channelId}")
@Operation(summary = "请求截图") @Operation(summary = "请求截图")
@Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "deviceId", description = "设备国标编号", required = true)

View File

@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.util.StringUtil; import com.github.pagehelper.util.StringUtil;
@ -37,10 +37,10 @@ public class MobilePositionController {
@Autowired @Autowired
private IMobilePositionService mobilePositionService; private IMobilePositionService mobilePositionService;
@Autowired @Autowired
private SIPCommander cmder; private ISIPCommander cmder;
@Autowired @Autowired
private DeferredResultHolder resultHolder; private DeferredResultHolder resultHolder;

View File

@ -97,4 +97,7 @@ public interface PlatformMapper {
@Select("SELECT * FROM wvp_platform WHERE server_id = #{serverId}") @Select("SELECT * FROM wvp_platform WHERE server_id = #{serverId}")
List<Platform> queryByServerId(@Param("serverId") String serverId); List<Platform> queryByServerId(@Param("serverId") String serverId);
@Select("SELECT * FROM wvp_platform ")
List<Platform> queryAll();
} }

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
@ -8,7 +7,6 @@ import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
@ -17,7 +15,10 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import java.text.ParseException; import java.text.ParseException;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* catalog事件 * catalog事件
@ -26,6 +27,9 @@ import java.util.*;
@Component @Component
public class CatalogEventLister implements ApplicationListener<CatalogEvent> { public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
@Autowired
private IPlatformService platformService;
@Autowired @Autowired
private IPlatformChannelService platformChannelService; private IPlatformChannelService platformChannelService;
@ -53,8 +57,9 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
} }
}else { }else {
List<Platform> allPlatform = platformService.queryAll();
// 获取所用订阅 // 获取所用订阅
List<String> platforms = subscribeHolder.getAllCatalogSubscribePlatform(); List<String> platforms = subscribeHolder.getAllCatalogSubscribePlatform(allPlatform);
if (event.getChannels() != null) { if (event.getChannels() != null) {
if (!platforms.isEmpty()) { if (!platforms.isEmpty()) {
for (CommonGBChannel deviceChannel : event.getChannels()) { for (CommonGBChannel deviceChannel : event.getChannels()) {
@ -159,4 +164,4 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
} }
} }
} }

View File

@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -24,6 +25,9 @@ import java.util.List;
@Component @Component
public class MobilePositionEventLister implements ApplicationListener<MobilePositionEvent> { public class MobilePositionEventLister implements ApplicationListener<MobilePositionEvent> {
@Autowired
private IPlatformService platformService;
@Autowired @Autowired
private IPlatformChannelService platformChannelService; private IPlatformChannelService platformChannelService;
@ -38,9 +42,9 @@ public class MobilePositionEventLister implements ApplicationListener<MobilePosi
if (event.getMobilePosition().getChannelId() == 0) { if (event.getMobilePosition().getChannelId() == 0) {
return; return;
} }
List<Platform> allPlatforms = platformService.queryAll();
// 获取所用订阅 // 获取所用订阅
List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(); List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms);
if (platforms.isEmpty()) { if (platforms.isEmpty()) {
return; return;
} }
@ -65,4 +69,3 @@ public class MobilePositionEventLister implements ApplicationListener<MobilePosi
} }
} }
} }

View File

@ -32,7 +32,7 @@ public interface IDeviceService {
* @param device 设备信息 * @param device 设备信息
* @return 布尔 * @return 布尔
*/ */
boolean addCatalogSubscribe(Device device); boolean addCatalogSubscribe(Device device, SipTransactionInfo transactionInfo);
/** /**
* 移除目录订阅 * 移除目录订阅
@ -46,7 +46,7 @@ public interface IDeviceService {
* @param device 设备信息 * @param device 设备信息
* @return 布尔 * @return 布尔
*/ */
boolean addMobilePositionSubscribe(Device device); boolean addMobilePositionSubscribe(Device device, SipTransactionInfo transactionInfo);
/** /**
* 移除移动位置订阅 * 移除移动位置订阅

View File

@ -48,4 +48,6 @@ public interface IPlatformChannelService {
void checkRegionAdd(List<CommonGBChannel> channelList); void checkRegionAdd(List<CommonGBChannel> channelList);
void checkRegionRemove(List<CommonGBChannel> channelList, List<Region> regionList); void checkRegionRemove(List<CommonGBChannel> channelList, List<Region> regionList);
List<Platform> queryByPlatformBySharChannelId(String gbId);
} }

View File

@ -85,4 +85,7 @@ public interface IPlatformService {
List<Platform> queryEnablePlatformList(String serverId); List<Platform> queryEnablePlatformList(String serverId);
void delete(Integer platformId, CommonCallback<Object> callback); void delete(Integer platformId, CommonCallback<Object> callback);
List<Platform> queryAll();
} }

View File

@ -15,9 +15,11 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.SubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.SubscribeTaskInfo;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.SubscribeTaskRunner;
import com.genersoft.iot.vmp.gb28181.task.impl.SubscribeTaskForCatalog;
import com.genersoft.iot.vmp.gb28181.task.impl.SubscribeTaskForMobilPosition;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -32,13 +34,17 @@ import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.text.ParseException; import java.text.ParseException;
@ -53,7 +59,7 @@ import java.util.concurrent.TimeUnit;
*/ */
@Slf4j @Slf4j
@Service @Service
public class DeviceServiceImpl implements IDeviceService { public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired @Autowired
private DynamicTask dynamicTask; private DynamicTask dynamicTask;
@ -100,10 +106,46 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired @Autowired
private IRedisRpcService redisRpcService; private IRedisRpcService redisRpcService;
@Autowired
private SubscribeTaskRunner subscribeTaskRunner;
private Device getDeviceByDeviceIdFromDb(String deviceId) { private Device getDeviceByDeviceIdFromDb(String deviceId) {
return deviceMapper.getDeviceByDeviceId(deviceId); return deviceMapper.getDeviceByDeviceId(deviceId);
} }
@Override
public void run(String... args) throws Exception {
// TODO 处理设备离线
// 处理订阅任务
List<SubscribeTaskInfo> taskInfoList = subscribeTaskRunner.getAllTaskInfo();
if (!taskInfoList.isEmpty()) {
for (SubscribeTaskInfo taskInfo : taskInfoList) {
if (taskInfo == null) {
continue;
}
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
if (device == null || !device.isOnLine()) {
subscribeTaskRunner.removeSubscribe(taskInfo.getKey());
continue;
}
if (SubscribeTaskForCatalog.name.equals(taskInfo.getName())) {
device.setSubscribeCycleForCatalog((int)taskInfo.getExpireTime());
SubscribeTask subscribeTask = SubscribeTaskForCatalog.getInstance(device, this::catalogSubscribeExpire, taskInfo.getTransactionInfo());
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
}else if (SubscribeTaskForMobilPosition.name.equals(taskInfo.getName())) {
device.setSubscribeCycleForMobilePosition((int)taskInfo.getExpireTime());
SubscribeTask subscribeTask = SubscribeTaskForMobilPosition.getInstance(device, this::mobilPositionSubscribeExpire, taskInfo.getTransactionInfo());
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
}
}
}
}
@Override @Override
public void online(Device device, SipTransactionInfo sipTransactionInfo) { public void online(Device device, SipTransactionInfo sipTransactionInfo) {
log.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); log.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
@ -164,12 +206,12 @@ public class DeviceServiceImpl implements IDeviceService {
// TODO 如果设备下的通道级联到了其他平台那么需要发送事件或者notify给上级平台 // TODO 如果设备下的通道级联到了其他平台那么需要发送事件或者notify给上级平台
} }
// 上线添加订阅 // 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0) { if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
// 查询在线设备那些开启了订阅为设备开启定时的目录订阅 // 查询在线设备那些开启了订阅为设备开启定时的目录订阅
addCatalogSubscribe(device); addCatalogSubscribe(device, null);
} }
if (device.getSubscribeCycleForMobilePosition() > 0) { if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
addMobilePositionSubscribe(device); addMobilePositionSubscribe(device, null);
} }
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
@ -254,20 +296,87 @@ public class DeviceServiceImpl implements IDeviceService {
} }
} }
// 订阅丢失检查
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
public void lostCheck(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) {
return;
}
for (Device device : deviceList) {
if (device == null || !device.isOnLine()) {
continue;
}
if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
log.info("[订阅丢失] 目录订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
addCatalogSubscribe(device, null);
}
if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
log.info("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId());
addMobilePositionSubscribe(device, null);
}
}
}
private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[目录订阅] 到期, 编号: {}", deviceId);
Device device = getDeviceByDeviceId(deviceId);
if (device == null) {
log.info("[目录订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
return;
}
if (device.getSubscribeCycleForCatalog() > 0) {
addCatalogSubscribe(device, transactionInfo);
}
}
private void mobilPositionSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[移动位置订阅] 到期, 编号: {}", deviceId);
Device device = getDeviceByDeviceId(deviceId);
if (device == null) {
log.info("[移动位置订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
return;
}
if (device.getSubscribeCycleForMobilePosition() > 0) {
addMobilePositionSubscribe(device, transactionInfo);
}
}
@Override @Override
public boolean addCatalogSubscribe(Device device) { public boolean addCatalogSubscribe(Device device, SipTransactionInfo transactionInfo) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) { if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false; return false;
} }
log.info("[添加目录订阅] 设备{}", device.getDeviceId()); log.info("[添加目录订阅] 设备 {}", device.getDeviceId());
// 添加目录订阅 try {
CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander, dynamicTask); sipCommander.catalogSubscribe(device, transactionInfo, eventResult -> {
// 刷新订阅 ResponseEvent event = (ResponseEvent) eventResult.event;
int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30); // 成功
// 设置最小值为30 log.info("[目录订阅]成功: {}", device.getDeviceId());
dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000); if (!subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
SIPResponse response = (SIPResponse) event.getResponse();
SipTransactionInfo transactionInfoForResonse = new SipTransactionInfo(response);
SubscribeTask subscribeTask = SubscribeTaskForCatalog.getInstance(device, this::catalogSubscribeExpire, transactionInfoForResonse);
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
}else {
subscribeTaskRunner.updateDelay(SubscribeTaskForCatalog.getKey(device), (device.getSubscribeCycleForCatalog() * 1000L - 500L) + System.currentTimeMillis());
}
catalogSubscribeTask.run(); },eventResult -> {
// 失败
log.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 目录订阅: {}", e.getMessage());
} finally {
// 无论是否发起成功都保存起来如果失败后续任务会继续订阅
deviceMapper.updateSubscribeCatalog(device);
redisCatchStorage.updateDevice(device);
}
return true; return true;
} }
@ -280,72 +389,110 @@ public class DeviceServiceImpl implements IDeviceService {
return false; return false;
} }
log.info("[移除目录订阅]: {}", device.getDeviceId()); log.info("[移除目录订阅]: {}", device.getDeviceId());
String taskKey = device.getDeviceId() + "catalog"; device.setSubscribeCycleForCatalog(0);
if (device.isOnLine()) { String key = SubscribeTaskForCatalog.getKey(device);
Runnable runnable = dynamicTask.get(taskKey); if (subscribeTaskRunner.containsKey(key)) {
if (runnable instanceof ISubscribeTask) { SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
ISubscribeTask subscribeTask = (ISubscribeTask) runnable; if (transactionInfo == null) {
subscribeTask.stop(callback); log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId());
}else {
log.info("[移除目录订阅]失败,未找到订阅任务 : {}", device.getDeviceId());
if (callback != null) {
callback.run(false);
}
} }
}else { try {
log.info("[移除移动位置订阅]失败,设备已经离线 : {}", device.getDeviceId()); sipCommander.catalogSubscribe(device, transactionInfo, eventResult -> {
if (callback != null) { // 成功
callback.run(false); log.info("[取消目录订阅]成功: {}", device.getDeviceId());
subscribeTaskRunner.removeSubscribe(SubscribeTaskForCatalog.getKey(device));
if (callback != null) {
callback.run(true);
}
},eventResult -> {
// 失败
log.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}catch (Exception e) {
// 失败
log.warn("[取消目录订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage());
}finally {
// 无论是否发起成功都保存起来如果失败到期后将不再发起
deviceMapper.updateSubscribeCatalog(device);
redisCatchStorage.updateDevice(device);
} }
} }
dynamicTask.stop(taskKey);
return true; return true;
} }
@Override @Override
public boolean addMobilePositionSubscribe(Device device) { public boolean addMobilePositionSubscribe(Device device, SipTransactionInfo transactionInfo) {
if (device == null || device.getSubscribeCycleForMobilePosition() < 0) { if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
return false; return false;
} }
log.info("[添加移动位置订阅] 设备{}", device.getDeviceId()); log.info("[添加移动位置订阅] 设备 {}", device.getDeviceId());
// 添加目录订阅 try {
MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander, dynamicTask); sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> {
// 设置最小值为30 ResponseEvent event = (ResponseEvent) eventResult.event;
int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30); // 成功
// 刷新订阅 log.info("[移动位置订阅]成功: {}", device.getDeviceId());
dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog * 1000); if (!subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
mobilePositionSubscribeTask.run(); SIPResponse response = (SIPResponse) event.getResponse();
SipTransactionInfo transactionInfoForResonse = new SipTransactionInfo(response);
SubscribeTask subscribeTask = SubscribeTaskForMobilPosition.getInstance(device, this::catalogSubscribeExpire, transactionInfoForResonse);
if (subscribeTask != null) {
subscribeTaskRunner.addSubscribe(subscribeTask);
}
}else {
subscribeTaskRunner.updateDelay(SubscribeTaskForMobilPosition.getKey(device), (device.getSubscribeCycleForCatalog() * 1000L - 500L) + System.currentTimeMillis());
}
},eventResult -> {
// 失败
log.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 移动位置订阅: {}", e.getMessage());
}finally {
// 无论是否发起成功都保存起来如果失败后续任务会继续订阅
deviceMapper.updateSubscribeMobilePosition(device);
redisCatchStorage.updateDevice(device);
}
return true; return true;
} }
@Override @Override
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) { public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) { if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
if (callback != null) { if (callback != null) {
callback.run(false); callback.run(false);
} }
return false; return false;
} }
log.info("[移除移动位置订阅]: {}", device.getDeviceId()); log.info("[移除移动位置订阅]: {}", device.getDeviceId());
String taskKey = device.getDeviceId() + "mobile_position"; device.setSubscribeCycleForMobilePosition(0);
if (device.isOnLine()) { String key = SubscribeTaskForMobilPosition.getKey(device);
Runnable runnable = dynamicTask.get(taskKey); if (subscribeTaskRunner.containsKey(key)) {
if (runnable instanceof ISubscribeTask) { SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
ISubscribeTask subscribeTask = (ISubscribeTask) runnable; if (transactionInfo == null) {
subscribeTask.stop(callback); log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId());
}else {
log.info("[移除移动位置订阅]失败,未找到订阅任务 : {}", device.getDeviceId());
if (callback != null) {
callback.run(false);
}
} }
}else { try {
log.info("[移除移动位置订阅]失败,设备已经离线 : {}", device.getDeviceId()); sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> {
if (callback != null) { // 成功
callback.run(false); log.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
if (callback != null) {
callback.run(true);
}
},eventResult -> {
// 失败
log.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}catch (Exception e) {
// 失败
log.warn("[取消移动位置订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage());
}finally {
// 无论是否发起成功都保存起来如果失败到期后将不再发起
deviceMapper.updateSubscribeMobilePosition(device);
redisCatchStorage.updateDevice(device);
} }
} }
dynamicTask.stop(taskKey);
return true; return true;
} }
@ -499,10 +646,20 @@ public class DeviceServiceImpl implements IDeviceService {
public boolean delete(String deviceId) { public boolean delete(String deviceId) {
Device device = getDeviceByDeviceIdFromDb(deviceId); Device device = getDeviceByDeviceIdFromDb(deviceId);
Assert.notNull(device, "未找到设备"); Assert.notNull(device, "未找到设备");
if (device.getSubscribeCycleForCatalog() > 0) {
removeCatalogSubscribe(device, null);
}
if (device.getSubscribeCycleForMobilePosition() > 0) {
removeMobilePositionSubscribe(device, null);
}
// 停止状态检测
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
dynamicTask.stop(registerExpireTaskKey);
platformChannelMapper.delChannelForDeviceId(deviceId); platformChannelMapper.delChannelForDeviceId(deviceId);
deviceChannelMapper.cleanChannelsByDeviceId(device.getId()); deviceChannelMapper.cleanChannelsByDeviceId(device.getId());
deviceMapper.del(deviceId); deviceMapper.del(deviceId);
redisCatchStorage.removeDevice(deviceId); redisCatchStorage.removeDevice(deviceId);
inviteStreamService.clearInviteInfo(deviceId);
return true; return true;
} }
@ -564,16 +721,13 @@ public class DeviceServiceImpl implements IDeviceService {
device.setSubscribeCycleForCatalog(cycle); device.setSubscribeCycleForCatalog(cycle);
if (cycle > 0) { if (cycle > 0) {
// 开启订阅 // 开启订阅
addCatalogSubscribe(device); addCatalogSubscribe(device, null);
} }
// 因为是异步执行需要在这里更新下数据
deviceMapper.updateSubscribeCatalog(device);
redisCatchStorage.updateDevice(device);
}); });
}else { }else {
// 开启订阅 // 开启订阅
device.setSubscribeCycleForCatalog(cycle); device.setSubscribeCycleForCatalog(cycle);
addCatalogSubscribe(device); addCatalogSubscribe(device, null);
deviceMapper.updateSubscribeCatalog(device); deviceMapper.updateSubscribeCatalog(device);
redisCatchStorage.updateDevice(device); redisCatchStorage.updateDevice(device);
} }
@ -598,21 +752,15 @@ public class DeviceServiceImpl implements IDeviceService {
device.setSubscribeCycleForMobilePosition(cycle); device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval); device.setMobilePositionSubmissionInterval(interval);
if (cycle > 0) { if (cycle > 0) {
addMobilePositionSubscribe(device); addMobilePositionSubscribe(device, null);
} }
// 因为是异步执行需要在这里更新下数据
deviceMapper.updateSubscribeMobilePosition(device);
redisCatchStorage.updateDevice(device);
}); });
}else { }else {
// 订阅未开启 // 订阅未开启
device.setSubscribeCycleForMobilePosition(cycle); device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval); device.setMobilePositionSubmissionInterval(interval);
// 开启订阅 // 开启订阅
addMobilePositionSubscribe(device); addMobilePositionSubscribe(device, null);
// 因为是异步执行需要在这里更新下数据
deviceMapper.updateSubscribeMobilePosition(device);
redisCatchStorage.updateDevice(device);
} }
} }

View File

@ -601,4 +601,12 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
public List<CommonGBChannel> queryChannelByPlatformIdAndChannelIds(Integer platformId, List<Integer> channelIds) { public List<CommonGBChannel> queryChannelByPlatformIdAndChannelIds(Integer platformId, List<Integer> channelIds) {
return platformChannelMapper.queryShare(platformId, channelIds); return platformChannelMapper.queryShare(platformId, channelIds);
} }
@Override
public List<Platform> queryByPlatformBySharChannelId(String channelDeviceId) {
CommonGBChannel commonGBChannel = commonGBChannelMapper.queryByDeviceId(channelDeviceId);
ArrayList<Integer> ids = new ArrayList<>();
ids.add(commonGBChannel.getGbId());
return platformChannelMapper.queryPlatFormListByChannelList(ids);
}
} }

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.gb28181.service.impl; package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
@ -950,4 +949,9 @@ public class PlatformServiceImpl implements IPlatformService {
// 删除平台信息 // 删除平台信息
platformMapper.delete(platform.getId()); platformMapper.delete(platform.getId());
} }
@Override
public List<Platform> queryAll() {
return platformMapper.queryAll();
}
} }

View File

@ -1,10 +0,0 @@
package com.genersoft.iot.vmp.gb28181.task;
import com.genersoft.iot.vmp.common.CommonCallback;
/**
* @author lin
*/
public interface ISubscribeTask extends Runnable{
void stop(CommonCallback<Boolean> callback);
}

View File

@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.gb28181.task;
import com.genersoft.iot.vmp.common.SubscribeCallback;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import lombok.Data;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
public abstract class SubscribeTask implements Delayed {
private String deviceId;
private SubscribeCallback callback;
private SipTransactionInfo transactionInfo;
/**
* 超时时间(单位 毫秒)
*/
private long delayTime;
public abstract void expired();
public abstract String getKey();
public abstract String getName();
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
public SubscribeTaskInfo getInfo(){
SubscribeTaskInfo subscribeTaskInfo = new SubscribeTaskInfo();
subscribeTaskInfo.setName(getName());
subscribeTaskInfo.setTransactionInfo(transactionInfo);
subscribeTaskInfo.setDeviceId(deviceId);
subscribeTaskInfo.setKey(getKey());
return subscribeTaskInfo;
}
}

View File

@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.gb28181.task;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import lombok.Data;
@Data
public class SubscribeTaskInfo {
private String deviceId;
private SipTransactionInfo transactionInfo;
private String name;
private String key;
/**
* 过期时间
*/
private long expireTime;
}

View File

@ -0,0 +1,135 @@
package com.genersoft.iot.vmp.gb28181.task;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class SubscribeTaskRunner{
private final Map<String, SubscribeTask> subscribes = new ConcurrentHashMap<>();
private final DelayQueue<SubscribeTask> delayQueue = new DelayQueue<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private UserSetting userSetting;
private final String prefix = "VMP_DEVICE_SUBSCRIBE";
// 订阅过期检查
@Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS)
public void expirationCheck(){
while (!delayQueue.isEmpty()) {
SubscribeTask take = null;
try {
take = delayQueue.take();
try {
removeSubscribe(take.getKey());
take.expired();
}catch (Exception e) {
log.error("[设备订阅到期] {} 到期处理时出现异常, 设备编号: {} ", take.getName(), take.getDeviceId());
}
} catch (InterruptedException e) {
log.error("[设备订阅任务] ", e);
}
}
}
public void addSubscribe(SubscribeTask task) {
Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000);
if (duration.getSeconds() < 0) {
return;
}
subscribes.put(task.getKey(), task);
String key = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getKey());
redisTemplate.opsForValue().set(key, task.getInfo(), duration);
delayQueue.offer(task);
}
public boolean removeSubscribe(String key) {
SubscribeTask task = subscribes.get(key);
if (task == null) {
return false;
}
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getKey());
redisTemplate.delete(redisKey);
subscribes.remove(key);
if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task);
if (!remove) {
log.info("[移除订阅任务] 从延时队列内移除失败: {}", key);
}
}
return true;
}
public SipTransactionInfo getTransactionInfo(String key) {
SubscribeTask task = subscribes.get(key);
if (task == null) {
return null;
}
return task.getTransactionInfo();
}
public boolean updateDelay(String key, long expirationTime) {
SubscribeTask task = subscribes.get(key);
if (task == null) {
return false;
}
log.info("[更新订阅任务时间] {}, 编号: {}", task.getName(), key);
if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task);
if (!remove) {
log.info("[更新订阅任务时间] 从延时队列内移除失败: {}", key);
}
}
task.setDelayTime(expirationTime);
delayQueue.offer(task);
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getKey());
Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000);
redisTemplate.expire(redisKey, duration);
return true;
}
public boolean containsKey(String key) {
return subscribes.containsKey(key);
}
public List<SubscribeTaskInfo> getAllTaskInfo(){
String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId());
List<Object> values = RedisUtil.scan(redisTemplate, scanKey);
if (values.isEmpty()) {
return new ArrayList<>();
}
List<SubscribeTaskInfo> result = new ArrayList<>();
for (Object value : values) {
String redisKey = (String)value;
SubscribeTaskInfo taskInfo = (SubscribeTaskInfo)redisTemplate.opsForValue().get(redisKey);
if (taskInfo == null) {
continue;
}
Long expire = redisTemplate.getExpire(redisKey);
taskInfo.setExpireTime(expire);
result.add(taskInfo);
}
return result;
}
}

View File

@ -1,107 +0,0 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import javax.sip.DialogState;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
/**
* 目录订阅任务
* @author lin
*/
@Slf4j
public class CatalogSubscribeTask implements ISubscribeTask {
private final Device device;
private final ISIPCommander sipCommander;
private SIPRequest request;
private final DynamicTask dynamicTask;
private final String taskKey = "catalog-subscribe-timeout";
public CatalogSubscribeTask(Device device, ISIPCommander sipCommander, DynamicTask dynamicTask) {
this.device = device;
this.sipCommander = sipCommander;
this.dynamicTask = dynamicTask;
}
@Override
public void run() {
if (dynamicTask.get(taskKey) != null) {
dynamicTask.stop(taskKey);
}
SIPRequest sipRequest = null;
try {
sipRequest = sipCommander.catalogSubscribe(device, request, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
// 成功
log.info("[目录订阅]成功: {}", device.getDeviceId());
ToHeader toHeader = (ToHeader)event.getResponse().getHeader(ToHeader.NAME);
try {
this.request.getToHeader().setTag(toHeader.getTag());
} catch (ParseException e) {
log.info("[目录订阅]成功: 但为request设置ToTag失败");
this.request = null;
}
},eventResult -> {
this.request = null;
// 失败
log.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
dynamicTask.startDelay(taskKey, CatalogSubscribeTask.this, 2000);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 目录订阅: {}", e.getMessage());
}
if (sipRequest != null) {
this.request = sipRequest;
}
}
@Override
public void stop(CommonCallback<Boolean> callback) {
/**
* dialog 的各个状态
* EARLY-> Early state状态-初始请求发送以后收到了一个临时响应消息
* CONFIRMED-> Confirmed Dialog状态-已确认
* COMPLETED-> Completed Dialog状态-已完成
* TERMINATED-> Terminated Dialog状态-终止
*/
log.info("取消目录订阅时dialog状态为{}", DialogState.CONFIRMED);
if (dynamicTask.get(taskKey) != null) {
dynamicTask.stop(taskKey);
}
device.setSubscribeCycleForCatalog(0);
try {
sipCommander.catalogSubscribe(device, request, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
log.info("[取消目录订阅]成功: {}", device.getDeviceId());
}else {
// 成功
log.info("[取消目录订阅]成功: {}", device.getDeviceId());
}
if (callback != null) {
callback.run(event.getResponse().getRawContent() != null);
}
},eventResult -> {
// 失败
log.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 取消目录订阅: {}", e.getMessage());
}
}
}

View File

@ -1,103 +0,0 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
/**
* 移动位置订阅的定时更新
* @author lin
*/
@Slf4j
public class MobilePositionSubscribeTask implements ISubscribeTask {
private final Device device;
private final ISIPCommander sipCommander;
private SIPRequest request;
private final DynamicTask dynamicTask;
private final String taskKey = "mobile-position-subscribe-timeout";
public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander, DynamicTask dynamicTask) {
this.device = device;
this.sipCommander = sipCommander;
this.dynamicTask = dynamicTask;
}
@Override
public void run() {
if (dynamicTask.get(taskKey) != null) {
dynamicTask.stop(taskKey);
}
SIPRequest sipRequest = null;
try {
sipRequest = sipCommander.mobilePositionSubscribe(device, request, eventResult -> {
// 成功
log.info("[移动位置订阅]成功: {}", device.getDeviceId());
ResponseEvent event = (ResponseEvent) eventResult.event;
ToHeader toHeader = (ToHeader)event.getResponse().getHeader(ToHeader.NAME);
try {
this.request.getToHeader().setTag(toHeader.getTag());
} catch (ParseException e) {
log.info("[移动位置订阅]成功: 为request设置ToTag失败");
this.request = null;
}
},eventResult -> {
this.request = null;
// 失败
log.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
dynamicTask.startDelay(taskKey, MobilePositionSubscribeTask.this, 2000);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 移动位置订阅: {}", e.getMessage());
}
if (sipRequest != null) {
this.request = sipRequest;
}
}
@Override
public void stop(CommonCallback<Boolean> callback) {
/**
* dialog 的各个状态
* EARLY-> Early state状态-初始请求发送以后收到了一个临时响应消息
* CONFIRMED-> Confirmed Dialog状态-已确认
* COMPLETED-> Completed Dialog状态-已完成
* TERMINATED-> Terminated Dialog状态-终止
*/
if (dynamicTask.get(taskKey) != null) {
dynamicTask.stop(taskKey);
}
device.setSubscribeCycleForMobilePosition(0);
try {
sipCommander.mobilePositionSubscribe(device, request, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
log.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}else {
// 成功
log.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}
if (callback != null) {
callback.run(event.getResponse().getRawContent() != null);
}
},eventResult -> {
// 失败
log.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 取消移动位置订阅: {}", e.getMessage());
}
}
}

View File

@ -0,0 +1,48 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.common.SubscribeCallback;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.task.SubscribeTask;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SubscribeTaskForCatalog extends SubscribeTask {
public static final String name = "catalog";
public static SubscribeTask getInstance(Device device, SubscribeCallback callback, SipTransactionInfo transactionInfo) {
if (device.getSubscribeCycleForCatalog() <= 0) {
return null;
}
SubscribeTaskForCatalog subscribeTaskForCatalog = new SubscribeTaskForCatalog();
subscribeTaskForCatalog.setDelayTime((device.getSubscribeCycleForCatalog() * 1000L - 500L) + System.currentTimeMillis());
subscribeTaskForCatalog.setDeviceId(device.getDeviceId());
subscribeTaskForCatalog.setCallback(callback);
subscribeTaskForCatalog.setTransactionInfo(transactionInfo);
return subscribeTaskForCatalog;
}
@Override
public void expired() {
if (super.getCallback() == null) {
log.info("[设备订阅到期] 目录订阅 未找到到期处理回调, 编号: {}", getDeviceId());
return;
}
getCallback().run(getDeviceId(), getTransactionInfo());
}
@Override
public String getKey() {
return String.format("%s_%s", name, getDeviceId());
}
@Override
public String getName() {
return name;
}
public static String getKey(Device device) {
return String.format("%s_%s", SubscribeTaskForCatalog.name, device.getDeviceId());
}
}

View File

@ -0,0 +1,48 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.common.SubscribeCallback;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.task.SubscribeTask;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SubscribeTaskForMobilPosition extends SubscribeTask {
public static final String name = "mobilPosition";
public static SubscribeTask getInstance(Device device, SubscribeCallback callback, SipTransactionInfo transactionInfo) {
if (device.getSubscribeCycleForCatalog() <= 0) {
return null;
}
SubscribeTaskForMobilPosition subscribeTaskForMobilPosition = new SubscribeTaskForMobilPosition();
subscribeTaskForMobilPosition.setDelayTime((device.getSubscribeCycleForMobilePosition() * 1000L - 500L) + System.currentTimeMillis());
subscribeTaskForMobilPosition.setDeviceId(device.getDeviceId());
subscribeTaskForMobilPosition.setCallback(callback);
subscribeTaskForMobilPosition.setTransactionInfo(transactionInfo);
return subscribeTaskForMobilPosition;
}
@Override
public void expired() {
if (super.getCallback() == null) {
log.info("[设备订阅到期] 移动位置订阅 未找到到期处理回调, 编号: {}", getDeviceId());
return;
}
getCallback().run(getDeviceId(), getTransactionInfo());
}
@Override
public String getKey() {
return String.format("%s_%s", name, getDeviceId());
}
@Override
public String getName() {
return name;
}
public static String getKey(Device device) {
return String.format("%s_%s", SubscribeTaskForMobilPosition.name, device.getDeviceId());
}
}

View File

@ -17,16 +17,16 @@ import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import java.text.ParseException; import java.text.ParseException;
/** /**
* @description:设备能力接口用于定义设备的控制查询能力 * @description:设备能力接口用于定义设备的控制查询能力
* @author: swwheihei * @author: swwheihei
* @date: 2020年5月3日 下午9:16:34 * @date: 2020年5月3日 下午9:16:34
*/ */
public interface ISIPCommander { public interface ISIPCommander {
/** /**
* 云台控制支持方向与缩放控制 * 云台控制支持方向与缩放控制
* *
* @param device 控制设备 * @param device 控制设备
* @param channelId 预览通道 * @param channelId 预览通道
* @param leftRight 镜头左移右移 0:停止 1:左移 2:右移 * @param leftRight 镜头左移右移 0:停止 1:左移 2:右移
@ -36,10 +36,10 @@ public interface ISIPCommander {
* @param zoomSpeed 镜头缩放速度 * @param zoomSpeed 镜头缩放速度
*/ */
void ptzCmd(Device device,String channelId,int leftRight, int upDown, int inOut, int moveSpeed, int zoomSpeed) throws InvalidArgumentException, SipException, ParseException; void ptzCmd(Device device,String channelId,int leftRight, int upDown, int inOut, int moveSpeed, int zoomSpeed) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 前端控制包括PTZ指令FI指令预置位指令巡航指令扫描指令和辅助开关指令 * 前端控制包括PTZ指令FI指令预置位指令巡航指令扫描指令和辅助开关指令
* *
* @param device 控制设备 * @param device 控制设备
* @param channelId 预览通道 * @param channelId 预览通道
* @param cmdCode 指令码 * @param cmdCode 指令码
@ -48,7 +48,7 @@ public interface ISIPCommander {
* @param combineCode2 组合码2 * @param combineCode2 组合码2
*/ */
void frontEndCmd(Device device, String channelId, int cmdCode, int parameter1, int parameter2, int combineCode2) throws SipException, InvalidArgumentException, ParseException; void frontEndCmd(Device device, String channelId, int cmdCode, int parameter1, int parameter2, int combineCode2) throws SipException, InvalidArgumentException, ParseException;
/** /**
* 前端控制指令用于转发上级指令 * 前端控制指令用于转发上级指令
* @param device 控制设备 * @param device 控制设备
@ -66,7 +66,7 @@ public interface ISIPCommander {
/** /**
* 请求回放视频流 * 请求回放视频流
* *
* @param device 视频设备 * @param device 视频设备
* @param channel 预览通道 * @param channel 预览通道
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss * @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
@ -76,13 +76,13 @@ public interface ISIPCommander {
/** /**
* 请求历史媒体下载 * 请求历史媒体下载
* *
* @param device 视频设备 * @param device 视频设备
* @param channel 预览通道 * @param channel 预览通道
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss * @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
* @param downloadSpeed 下载倍速参数 * @param downloadSpeed 下载倍速参数
*/ */
void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
String startTime, String endTime, int downloadSpeed, String startTime, String endTime, int downloadSpeed,
SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException; SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
@ -116,7 +116,7 @@ public interface ISIPCommander {
* 回放倍速播放 * 回放倍速播放
*/ */
void playSpeedCmd(Device device, DeviceChannel channel, StreamInfo streamInfo, Double speed) throws InvalidArgumentException, ParseException, SipException; void playSpeedCmd(Device device, DeviceChannel channel, StreamInfo streamInfo, Double speed) throws InvalidArgumentException, ParseException, SipException;
/** /**
* 回放控制 * 回放控制
* @param device * @param device
@ -138,39 +138,39 @@ public interface ISIPCommander {
/** /**
* 音视频录像控制 * 音视频录像控制
* *
* @param device 视频设备 * @param device 视频设备
* @param channelId 预览通道 * @param channelId 预览通道
* @param recordCmdStr 录像命令Record / StopRecord * @param recordCmdStr 录像命令Record / StopRecord
*/ */
void recordCmd(Device device, String channelId, String recordCmdStr, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException; void recordCmd(Device device, String channelId, String recordCmdStr, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 远程启动控制命令 * 远程启动控制命令
* *
* @param device 视频设备 * @param device 视频设备
*/ */
void teleBootCmd(Device device) throws InvalidArgumentException, SipException, ParseException; void teleBootCmd(Device device) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 报警布防/撤防命令 * 报警布防/撤防命令
* *
* @param device 视频设备 * @param device 视频设备
*/ */
void guardCmd(Device device, String guardCmdStr, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException; void guardCmd(Device device, String guardCmdStr, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 报警复位命令 * 报警复位命令
* *
* @param device 视频设备 * @param device 视频设备
* @param alarmMethod 报警方式可选 * @param alarmMethod 报警方式可选
* @param alarmType 报警类型可选 * @param alarmType 报警类型可选
*/ */
void alarmResetCmd(Device device, String alarmMethod, String alarmType, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException; void alarmResetCmd(Device device, String alarmMethod, String alarmType, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 强制关键帧命令,设备收到此命令应立刻发送一个IDR帧 * 强制关键帧命令,设备收到此命令应立刻发送一个IDR帧
* *
* @param device 视频设备 * @param device 视频设备
* @param channelId 预览通道 * @param channelId 预览通道
*/ */
@ -184,11 +184,11 @@ public interface ISIPCommander {
/** /**
* 设备配置命令 * 设备配置命令
* *
* @param device 视频设备 * @param device 视频设备
*/ */
void deviceConfigCmd(Device device); void deviceConfigCmd(Device device);
/** /**
* 设备配置命令basicParam * 设备配置命令basicParam
*/ */
@ -196,11 +196,11 @@ public interface ISIPCommander {
/** /**
* 查询设备状态 * 查询设备状态
* *
* @param device 视频设备 * @param device 视频设备
*/ */
void deviceStatusQuery(Device device, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException; void deviceStatusQuery(Device device, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 查询设备信息 * 查询设备信息
* *
@ -209,27 +209,27 @@ public interface ISIPCommander {
* @return * @return
*/ */
void deviceInfoQuery(Device device, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException; void deviceInfoQuery(Device device, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 查询目录列表 * 查询目录列表
* *
* @param device 视频设备 * @param device 视频设备
*/ */
void catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) throws SipException, InvalidArgumentException, ParseException; void catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) throws SipException, InvalidArgumentException, ParseException;
/** /**
* 查询录像信息 * 查询录像信息
* *
* @param device 视频设备 * @param device 视频设备
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss * @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
* @param sn * @param sn
*/ */
void recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, Integer Secrecy, String type, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; void recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, Integer Secrecy, String type, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 查询报警信息 * 查询报警信息
* *
* @param device 视频设备 * @param device 视频设备
* @param startPriority 报警起始级别可选 * @param startPriority 报警起始级别可选
* @param endPriority 报警终止级别可选 * @param endPriority 报警终止级别可选
@ -241,37 +241,37 @@ public interface ISIPCommander {
*/ */
void alarmInfoQuery(Device device, String startPriority, String endPriority, String alarmMethod, void alarmInfoQuery(Device device, String startPriority, String endPriority, String alarmMethod,
String alarmType, String startTime, String endTime, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException; String alarmType, String startTime, String endTime, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 查询设备配置 * 查询设备配置
* *
* @param device 视频设备 * @param device 视频设备
* @param channelId 通道编码可选 * @param channelId 通道编码可选
* @param configType 配置类型 * @param configType 配置类型
*/ */
void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException; void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 查询设备预置位置 * 查询设备预置位置
* *
* @param device 视频设备 * @param device 视频设备
*/ */
void presetQuery(Device device, String channelId, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException; void presetQuery(Device device, String channelId, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 查询移动设备位置数据 * 查询移动设备位置数据
* *
* @param device 视频设备 * @param device 视频设备
*/ */
void mobilePostitionQuery(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; void mobilePostitionQuery(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 订阅取消订阅移动位置 * 订阅取消订阅移动位置
* *
* @param device 视频设备 * @param device 视频设备
* @return true = 命令发送成功 * @return true = 命令发送成功
*/ */
SIPRequest mobilePositionSubscribe(Device device, SIPRequest request, SipSubscribe.Event okEvent , SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; SIPRequest mobilePositionSubscribe(Device device, SipTransactionInfo transactionInfo, SipSubscribe.Event okEvent , SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 订阅取消订阅报警信息 * 订阅取消订阅报警信息
@ -290,7 +290,7 @@ public interface ISIPCommander {
* @param device 视频设备 * @param device 视频设备
* @return true = 命令发送成功 * @return true = 命令发送成功
*/ */
SIPRequest catalogSubscribe(Device device, SIPRequest request, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; SIPRequest catalogSubscribe(Device device, SipTransactionInfo transactionInfo, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/** /**
* 拉框控制命令 * 拉框控制命令

View File

@ -33,7 +33,7 @@ public class SIPRequestHeaderProvider {
@Autowired @Autowired
private SipConfig sipConfig; private SipConfig sipConfig;
@Autowired @Autowired
private SipLayer sipLayer; private SipLayer sipLayer;
@ -43,7 +43,7 @@ public class SIPRequestHeaderProvider {
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null; Request request = null;
// sipuri // sipuri
@ -76,7 +76,7 @@ public class SIPRequestHeaderProvider {
request.setContent(content, contentTypeHeader); request.setContent(content, contentTypeHeader);
return request; return request;
} }
public Request createInviteRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, String ssrc, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { public Request createInviteRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, String ssrc, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null; Request request = null;
//请求行 //请求行
@ -96,10 +96,10 @@ public class SIPRequestHeaderProvider {
SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, device.getHostAddress()); SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, device.getHostAddress());
Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI); Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress,null); ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress,null);
//Forwards //Forwards
MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);
//ceq //ceq
CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE); CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE);
request = SipFactory.getInstance().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); request = SipFactory.getInstance().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards);
@ -116,7 +116,7 @@ public class SIPRequestHeaderProvider {
request.setContent(content, contentTypeHeader); request.setContent(content, contentTypeHeader);
return request; return request;
} }
public Request createPlaybackInviteRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader, String ssrc) throws ParseException, InvalidArgumentException, PeerUnavailableException { public Request createPlaybackInviteRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader, String ssrc) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null; Request request = null;
//请求行 //请求行
@ -134,14 +134,14 @@ public class SIPRequestHeaderProvider {
SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, device.getHostAddress()); SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, device.getHostAddress());
Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI); Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress,null); ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress,null);
//Forwards //Forwards
MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);
//ceq //ceq
CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE); CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE);
request = SipFactory.getInstance().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); request = SipFactory.getInstance().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards);
Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), sipLayer.getLocalIp(device.getLocalIp())+":"+sipConfig.getPort())); Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), sipLayer.getLocalIp(device.getLocalIp())+":"+sipConfig.getPort()));
// Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), device.getHost().getIp()+":"+device.getHost().getPort())); // Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), device.getHost().getIp()+":"+device.getHost().getPort()));
request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress)); request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress));
@ -231,7 +231,7 @@ public class SIPRequestHeaderProvider {
return request; return request;
} }
public Request createSubscribeRequest(Device device, String content, SIPRequest requestOld, Integer expires, String event, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { public Request createSubscribeRequest(Device device, String content, SipTransactionInfo sipTransactionInfo, Integer expires, String event, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null; Request request = null;
// sipuri // sipuri
SipURI requestURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress()); SipURI requestURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress());
@ -244,11 +244,11 @@ public class SIPRequestHeaderProvider {
// from // from
SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getDomain()); SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getDomain());
Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI); Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, requestOld == null ? SipUtils.getNewFromTag() :requestOld.getFromTag()); FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, sipTransactionInfo == null ? SipUtils.getNewFromTag() :sipTransactionInfo.getFromTag());
// to // to
SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress()); SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress());
Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI); Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, requestOld == null ? null :requestOld.getToTag()); ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, sipTransactionInfo == null ? null :sipTransactionInfo.getToTag());
// Forwards // Forwards
MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);

View File

@ -1179,7 +1179,7 @@ public class SIPCommander implements ISIPCommander {
* @return true = 命令发送成功 * @return true = 命令发送成功
*/ */
@Override @Override
public SIPRequest mobilePositionSubscribe(Device device, SIPRequest requestOld, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { public SIPRequest mobilePositionSubscribe(Device device, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
StringBuffer subscribePostitionXml = new StringBuffer(200); StringBuffer subscribePostitionXml = new StringBuffer(200);
String charset = device.getCharset(); String charset = device.getCharset();
@ -1197,12 +1197,12 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader; CallIdHeader callIdHeader;
if (requestOld != null) { if (sipTransactionInfo != null) {
callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(requestOld.getCallIdHeader().getCallId()); callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sipTransactionInfo.getCallId());
} else { } else {
callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()); callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport());
} }
SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), requestOld, device.getSubscribeCycleForMobilePosition(), "presence",callIdHeader); //Position;id=" + tm.substring(tm.length() - 4)); SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), sipTransactionInfo, device.getSubscribeCycleForMobilePosition(), "presence",callIdHeader); //Position;id=" + tm.substring(tm.length() - 4));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent);
return request; return request;
@ -1255,7 +1255,7 @@ public class SIPCommander implements ISIPCommander {
} }
@Override @Override
public SIPRequest catalogSubscribe(Device device, SIPRequest requestOld, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { public SIPRequest catalogSubscribe(Device device, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
StringBuffer cmdXml = new StringBuffer(200); StringBuffer cmdXml = new StringBuffer(200);
String charset = device.getCharset(); String charset = device.getCharset();
@ -1268,14 +1268,14 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader; CallIdHeader callIdHeader;
if (requestOld != null) { if (sipTransactionInfo != null) {
callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(requestOld.getCallIdHeader().getCallId()); callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sipTransactionInfo.getCallId());
} else { } else {
callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()); callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport());
} }
// 有效时间默认为60秒以上 // 有效时间默认为60秒以上
SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, cmdXml.toString(), requestOld, device.getSubscribeCycleForCatalog(), "Catalog", SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, cmdXml.toString(), sipTransactionInfo, device.getSubscribeCycleForCatalog(), "Catalog",
callIdHeader); callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent);
return request; return request;

View File

@ -97,10 +97,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
} }
Device device = sipMsgInfo.getDevice(); Device device = sipMsgInfo.getDevice();
SIPRequest request = (SIPRequest) evt.getRequest(); SIPRequest request = (SIPRequest) evt.getRequest();
// if (!ObjectUtils.isEmpty(device.getKeepaliveTime()) && DateUtil.getDifferenceForNow(device.getKeepaliveTime()) <= 3000L) {
// log.info("[收到心跳] 心跳发送过于频繁,已忽略 device: {}, callId: {}", device.getDeviceId(), request.getCallIdHeader().getCallId());
// return;
// }
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) { if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
@ -109,12 +105,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort()))); device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort())));
device.setIp(remoteAddressInfo.getIp()); device.setIp(remoteAddressInfo.getIp());
device.setLocalIp(request.getLocalAddress().getHostAddress()); device.setLocalIp(request.getLocalAddress().getHostAddress());
// 设备地址变化会引起目录订阅任务失效需要重新添加
if (device.getSubscribeCycleForCatalog() > 0) {
deviceService.removeCatalogSubscribe(device, result -> {
deviceService.addCatalogSubscribe(device);
});
}
} }
device.setKeepaliveTime(DateUtil.getNow()); device.setKeepaliveTime(DateUtil.getNow());

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -55,6 +56,9 @@ public class RedisAlarmMsgListener implements MessageListener {
@Autowired @Autowired
private IPlatformService platformService; private IPlatformService platformService;
@Autowired
private IPlatformChannelService platformChannelService;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Autowired @Autowired
@ -145,22 +149,25 @@ public class RedisAlarmMsgListener implements MessageListener {
} }
} else { } else {
Device device = deviceService.getDeviceByDeviceId(gbId); // 获取该通道ID是属于设备还是对应的上级平台
Platform platform = platformService.queryPlatformByServerGBId(gbId); Device device = deviceService.getDeviceBySourceChannelDeviceId(gbId);
if (device != null && platform == null) { List<Platform> platforms = platformChannelService.queryByPlatformBySharChannelId(gbId);
if (device != null && (platforms == null || platforms.isEmpty())) {
try { try {
commander.sendAlarmMessage(device, deviceAlarm); commander.sendAlarmMessage(device, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) { } catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage()); log.error("[命令发送失败] 发送报警: {}", e.getMessage());
} }
} else if (device == null && platform != null) { } else if (device == null && (platforms != null && !platforms.isEmpty())) {
try { for (Platform platform : platforms) {
commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); try {
} catch (InvalidArgumentException | SipException | ParseException e) { commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
log.error("[命令发送失败] 发送报警: {}", e.getMessage()); } catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
} }
} else { } else {
log.warn("无法确定" + gbId + "是平台还是设备"); log.warn("[REDIS的ALARM通知] 未查询到" + gbId + "所属的平台或设备");
} }
} }
} catch (Exception e) { } catch (Exception e) {