Compare commits

...

11 Commits

Author SHA1 Message Date
阿斌
2ab4c40212
Pre Merge pull request !36 from 阿斌/N/A 2025-06-03 09:17:19 +00:00
lin
227239f7f7 去除调试日志 2025-06-03 17:17:04 +08:00
lin
e3f880627e 去除多余数据库脚本 2025-06-03 16:53:36 +08:00
lin
f4bbca78e5 优化通道变化消息发送以及增加设备状态丢失检测 2025-06-03 16:52:42 +08:00
lin
cc0c73a64d Merge branch 'master' into dev/设备和平台使用不同的优化策略 2025-06-03 11:31:38 +08:00
lin
46d30a8fe2 Merge branch 'master' into dev/设备和平台使用不同的优化策略
# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java
#	数据库/2.7.4/初始化-mysql-2.7.4.sql
#	数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql
2025-05-29 15:31:52 +08:00
lin
ca37162dd5 Merge branch 'master' into dev/设备和平台使用不同的优化策略
# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java
2025-05-26 09:09:36 +08:00
lin
d524905edb 增加设备和平台在线状态不同的同步策略 2025-05-19 15:52:56 +08:00
lin
f7f3ed4c5b Merge branch 'master' into dev/设备和平台使用不同的优化策略 2025-05-19 14:42:44 +08:00
lin
5224f2b5c0 增加数据库脚本 2025-05-19 10:39:49 +08:00
阿斌
da98101aac
update src/main/resources/civilCode.csv.
行政规划错误。江苏南通海门市,修改为海门区,浙江杭州删除下城区、江干区,新增钱塘区,临平区

Signed-off-by: 阿斌 <38912748@qq.com>
2024-12-15 08:58:42 +00:00
21 changed files with 112 additions and 58 deletions

View File

@ -1,23 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 国标类型编码,国标编码中11-13位为类型编码
* 详见 D 编码规则 A
* @author lin
*/
public class ChannelIdType {
/**
* 中心信令控制服务器编码
*/
public final static String CENTRAL_SIGNALING_CONTROL_SERVER = "200";
/**
* 业务分组编码
*/
public final static String BUSINESS_GROUP = "215";
/**
* 虚拟组织编码
*/
public final static String VIRTUAL_ORGANIZATION = "216";
}

View File

@ -26,19 +26,19 @@ public class Device {
*/
@Schema(description = "名称")
private String name;
/**
* 生产厂商
*/
@Schema(description = "生产厂商")
private String manufacturer;
/**
* 型号
*/
@Schema(description = "型号")
private String model;
/**
* 固件版本
*/
@ -78,7 +78,7 @@ public class Device {
*/
@Schema(description = "wan地址")
private String hostAddress;
/**
* 在线
*/

View File

@ -123,7 +123,12 @@ public class DeviceQuery {
log.debug("设备通道信息同步API调用deviceId" + deviceId);
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
if (device.getRegisterTime() == null) {
WVPResult<SyncStatus> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("设备尚未注册过");
return wvpResult;
}
return deviceService.devicesSync(device);
}

View File

@ -578,4 +578,8 @@ public interface CommonGBChannelMapper {
" <foreach collection='channelIdsForClear' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
" </script>"})
void removeParentIdByChannelIds(List<Integer> channelIdsForClear);
@SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceId")
List<CommonGBChannel> queryOnlineListsByGbDeviceId(@Param("deviceId") int deviceId);
}

View File

@ -666,4 +666,8 @@ public interface DeviceChannelMapper {
" where data_type = 1 and data_device_id=#{dataDeviceId} and device_id = #{channelId}" +
" </script>"})
DeviceChannel getOneBySourceChannelId(@Param("dataDeviceId") int dataDeviceId, @Param("channelId") String channelId);
@Update(value = {"UPDATE wvp_device_channel SET status = 'OFF' WHERE data_type = 1 and data_device_id=#{deviceId}"})
void offlineByDeviceId(@Param("deviceId") int deviceId);
}

View File

@ -97,7 +97,7 @@ public class ChannelProvider {
" coalesce(wdc.gb_svc_time_support_mode, wdc.svc_time_support_mode) as gb_svc_time_support_mode\n" +
" from wvp_device_channel wdc\n"
;
private final static String BASE_SQL_FOR_PLATFORM =
"select\n" +
" wdc.id as gb_id,\n" +
@ -455,6 +455,13 @@ public class ChannelProvider {
return sqlBuild.toString();
}
public String queryOnlineListsByGbDeviceId(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL_TABLE_NAME);
sqlBuild.append(" where wdc.channel_type = 0 AND coalesce(wdc.gb_status, wdc.status) = 'ON' AND wdc.data_type = 1 AND data_device_id = #{deviceId}");
return sqlBuild.toString();
}
public String queryAllForUnusualCivilCode(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append("select wdc.id from wvp_device_channel wdc ");

View File

@ -68,11 +68,7 @@ public class EventPublisher {
deviceChannelList.add(deviceChannel);
catalogEventPublish(platform, deviceChannelList, type);
}
public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type) {
catalogEventPublish(platform, deviceChannels, type, true);
}
public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type, boolean share) {
if (platform != null && !userSetting.getServerId().equals(platform.getServerId())) {
log.info("[国标级联] 目录状态推送, 此上级平台由其他服务处理,消息已经忽略");
return;

View File

@ -47,7 +47,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
public void onApplicationEvent(CatalogEvent event) {
SubscribeInfo subscribe = null;
Platform parentPlatform = null;
log.info("[Catalog事件: {}] 通道数量: {}", event.getType(), event.getChannels().size());
log.info("[Catalog事件: {}]通道数量: {}", event.getType(), event.getChannels().size());
Map<String, List<Platform>> platformMap = new HashMap<>();
Map<String, CommonGBChannel> channelMap = new HashMap<>();
if (event.getPlatform() != null) {

View File

@ -224,7 +224,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Override
public List<Device> getDeviceByChannelId(String channelId) {
return channelMapper.getDeviceByChannelDeviceId(channelId);
}
@ -574,7 +573,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
deviceChannel.setStreamId(channelInDb.getStreamId());
deviceChannel.setHasAudio(channelInDb.isHasAudio());
deviceChannel.setId(channelInDb.getId());
if (channelInDb.getStatus() != null && channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){
if (channelInDb.getStatus() != null && !channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){
List<Platform> platformList = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getDeviceId());
if (!CollectionUtils.isEmpty(platformList)){
platformList.forEach(platform->{

View File

@ -3,13 +3,15 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
@ -64,9 +66,6 @@ import java.util.concurrent.TimeUnit;
@Order(value=16)
public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired
private DynamicTask dynamicTask;
@Autowired
private ISIPCommander sipCommander;
@ -88,6 +87,12 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired
private DeviceChannelMapper deviceChannelMapper;
@Autowired
private CommonGBChannelMapper commonGBChannelMapper;
@Autowired
private EventPublisher eventPublisher;
@Autowired
private ISendRtpServerService sendRtpServerService;
@ -156,7 +161,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
// 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接
DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(),
taskInfo.getTransactionInfo(), taskInfo.getExpireTime(), this::deviceStatusExpire);
taskInfo.getTransactionInfo(), taskInfo.getExpireTime() + 1000 + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(deviceStatusTask);
onlineDeviceIds.add(taskInfo.getDeviceId());
}
@ -238,8 +243,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
}
//进行通道离线
// deviceChannelMapper.offlineByDeviceId(deviceId);
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
@ -308,7 +311,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
device.setCreateTime(now);
device.setUpdateTime(now);
log.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
addCustomDevice(device);
if(device.getStreamMode() == null) {
device.setStreamMode("TCP-PASSIVE");
}
deviceMapper.add(device);
redisCatchStorage.updateDevice(device);
try {
@ -333,7 +338,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
sync(device);
// TODO 如果设备下的通道级联到了其他平台那么需要发送事件或者notify给上级平台
}else {
if (isDevice(device.getDeviceId())) {
sync(device);
}
}
// 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
@ -361,20 +369,21 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
if (sipTransactionInfo == null) {
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), System.currentTimeMillis() + expiresTime);
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}else {
deviceStatusTaskRunner.removeTask(device.getDeviceId());
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire);
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task);
}
}else {
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire);
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task);
}
}
@Override
@Transactional
public void offline(String deviceId, String reason) {
Device device = getDeviceByDeviceIdFromDb(deviceId);
if (device == null) {
@ -397,11 +406,38 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
cleanOfflineDevice(device);
redisCatchStorage.updateDevice(device);
deviceMapper.update(device);
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
if (isDevice(deviceId)) {
channelOfflineByDevice(device);
}
}
private void channelOfflineByDevice(Device device) {
// 进行通道离线
List<CommonGBChannel> channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceId(device.getId());
if (channelList.isEmpty()) {
return;
}
deviceChannelMapper.offlineByDeviceId(device.getId());
// 发送通道离线通知
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.OFF);
}
private boolean isDevice(String deviceId) {
GbCode decode = GbCode.decode(deviceId);
if (decode == null) {
return true;
}
int code = Integer.parseInt(decode.getTypeCode());
return code <= 199;
}
// 订阅丢失检查
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
public void lostCheck(){
public void lostCheckForSubscribe(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) {
@ -422,6 +458,25 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
// 设备状态丢失检查
@Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public void lostCheckForStatus(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) {
return;
}
for (Device device : deviceList) {
if (device == null || !device.isOnLine() || !userSetting.getServerId().equals(device.getServerId())) {
continue;
}
if (!deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
log.debug("[状态丢失] 执行设备离线, 编号: {},", device.getDeviceId());
offline(device.getDeviceId(), "");
}
}
}
private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[目录订阅] 到期, 编号: {}", deviceId);
Device device = getDeviceByDeviceId(deviceId);

View File

@ -28,7 +28,7 @@ public class DeviceStatusTask implements Delayed {
DeviceStatusTask deviceStatusTask = new DeviceStatusTask();
deviceStatusTask.setDeviceId(deviceId);
deviceStatusTask.setTransactionInfo(transactionInfo);
deviceStatusTask.setDelayTime((delayTime * 1000L - 500L) + System.currentTimeMillis());
deviceStatusTask.setDelayTime(delayTime);
deviceStatusTask.setCallback(callback);
return deviceStatusTask;
}

View File

@ -93,7 +93,7 @@ public class DeviceStatusTaskRunner {
if (task == null) {
return false;
}
log.info("[更新状态任务时间] 编号: {}", key);
log.debug("[更新状态任务时间] 编号: {}", key);
if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task);
if (!remove) {

View File

@ -49,7 +49,7 @@ public class SIPRequestHeaderProvider {
// sipuri
SipURI requestURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress());
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ArrayList<ViaHeader> viaHeaders = new ArrayList<>();
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), viaTag);
viaHeader.setRPort();
viaHeaders.add(viaHeader);

View File

@ -69,8 +69,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
responseAck(request, Response.BAD_REQUEST);
return;
}
String platformId = SipUtils.getUserIdFromFromHeader(request);
String cmd = XmlUtil.getText(rootElement, "CmdType");
log.info("[收到订阅请求] 类型: {}", cmd);
log.info("[收到订阅请求] 类型: {}, 来自: {}", cmd, platformId);
if (CmdType.MOBILE_POSITION.equals(cmd)) {
processNotifyMobilePosition(request, rootElement);
// } else if (CmdType.ALARM.equals(cmd)) {

View File

@ -131,6 +131,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
// 总数减一 避免最后总数不对 无法确定问题
continue;
}
// 从xml解析内容到 DeviceChannel 对象

View File

@ -62,6 +62,11 @@ public class RedisRpcDeviceController extends RpcController {
response.setBody("param error");
return response;
}
if (device.getRegisterTime() == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("设备尚未注册过");
return response;
}
WVPResult<SyncStatus> result = deviceService.devicesSync(device);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(result));

View File

@ -74,7 +74,7 @@ public class RedisRpcPlatformController extends RpcController {
List<CommonGBChannel> channels = jsonObject.getJSONArray("channels").toJavaList(CommonGBChannel.class);
String type = jsonObject.getString("type");
eventPublisher.catalogEventPublish(platform, channels, type, false);
eventPublisher.catalogEventPublish(platform, channels, type);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;

View File

@ -2,4 +2,4 @@ spring:
application:
name: wvp
profiles:
active: 274
active: 274-dev

View File

@ -861,7 +861,7 @@
320623,如东县,3206
320681,启东市,3206
320682,如皋市,3206
320684,海门,3206
320684,海门,3206
320685,海安市,3206
3207,连云港市,32
320703,连云区,3207
@ -918,8 +918,6 @@
33,浙江省,
3301,杭州市,33
330102,上城区,3301
330103,下城区,3301
330104,江干区,3301
330105,拱墅区,3301
330106,西湖区,3301
330108,滨江区,3301
@ -927,6 +925,8 @@
330110,余杭区,3301
330111,富阳区,3301
330112,临安区,3301
330113,临平区,3301
330114,钱塘区,3301
330122,桐庐县,3301
330127,淳安县,3301
330182,建德市,3301

1 编号 名称 上级
861 320623 如东县 3206
862 320681 启东市 3206
863 320682 如皋市 3206
864 320684 海门市 海门区 3206
865 320685 海安市 3206
866 3207 连云港市 32
867 320703 连云区 3207
918 33 浙江省
919 3301 杭州市 33
920 330102 上城区 3301
330103 下城区 3301
330104 江干区 3301
921 330105 拱墅区 3301
922 330106 西湖区 3301
923 330108 滨江区 3301
925 330110 余杭区 3301
926 330111 富阳区 3301
927 330112 临安区 3301
928 330113 临平区 3301
929 330114 钱塘区 3301
930 330122 桐庐县 3301
931 330127 淳安县 3301
932 330182 建德市 3301

View File