Merge branch 'refs/heads/dev/压力测试' into 报警管理

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java
This commit is contained in:
lin 2026-03-31 09:10:02 +08:00
commit 01c71f8800
78 changed files with 1098 additions and 859 deletions

View File

@ -60,6 +60,7 @@
<asciidoctor.html.output.directory>${project.build.directory}/asciidoc/html</asciidoctor.html.output.directory>
<asciidoctor.pdf.output.directory>${project.build.directory}/asciidoc/pdf</asciidoctor.pdf.output.directory>
<java.version>21</java.version>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.jt1078.util.ClassUtil;
import com.genersoft.iot.vmp.utils.GitUtil;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
@ -43,6 +44,7 @@ public class VManageBootstrap extends SpringBootServletInitializer {
log.info("构建时间: {}", gitUtil.getBuildDate());
log.info("GIT信息 分支: {}, ID: {}, 时间: {}", gitUtil.getBranch(), gitUtil.getCommitIdShort(), gitUtil.getCommitTime());
}
}
// 项目重启
public static void restart() {

View File

@ -19,6 +19,8 @@ public class VideoManagerConstants {
public static final String ONLINE_MEDIA_SERVERS_PREFIX = "VMP_ONLINE_MEDIA_SERVERS:";
public static final String DEVICE_PREFIX = "VMP_DEVICE_INFO";
public static final String DEVICE_KEEPALIVE_PREFIX = "VMP_DEVICE_KEEPALIVE:";
public static final String DEVICE_REGISTER_PREFIX = "VMP_DEVICE_REGISTER:";
public static final String INVITE_PREFIX = "VMP_GB_INVITE_INFO";

View File

@ -2,11 +2,11 @@ package com.genersoft.iot.vmp.conf;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
import java.time.Instant;
import java.util.Date;
import java.util.Map;
@ -23,20 +23,12 @@ import java.util.concurrent.TimeUnit;
@Component
public class DynamicTask {
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Autowired
private TaskScheduler taskScheduler;
private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>();
@PostConstruct
public void DynamicTask() {
threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(300);
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(10);
threadPoolTaskScheduler.setThreadNamePrefix("dynamicTask-");
threadPoolTaskScheduler.initialize();
}
/**
* 循环执行的任务
@ -60,7 +52,7 @@ public class DynamicTask {
}
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period cycleForCatalog表示执行的间隔
future = threadPoolTaskScheduler.scheduleAtFixedRate(task, new Date(System.currentTimeMillis() + cycleForCatalog), cycleForCatalog);
future = taskScheduler.scheduleAtFixedRate(task, new Date(System.currentTimeMillis() + cycleForCatalog), cycleForCatalog);
if (future != null){
futureMap.put(key, future);
runnableMap.put(key, task);
@ -96,7 +88,7 @@ public class DynamicTask {
}
}
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period cycleForCatalog表示执行的间隔
future = threadPoolTaskScheduler.schedule(task, startInstant);
future = taskScheduler.schedule(task, startInstant);
if (future != null){
futureMap.put(key, future);
runnableMap.put(key, task);

View File

@ -1,40 +0,0 @@
package com.genersoft.iot.vmp.conf;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import static com.genersoft.iot.vmp.conf.ThreadPoolTaskConfig.cpuNum;
/**
* "@Scheduled"是Spring框架提供的一种定时任务执行机制默认情况下它是单线程的在同时执行多个定时任务时可能会出现阻塞和性能问题
* 为了解决这种单线程瓶颈问题可以将定时任务的执行机制改为支持多线程
*/
@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
/**
* 核心线程数默认线程数
*/
private static final int corePoolSize = Math.max(cpuNum, 20);
/**
* 线程池名前缀
*/
private static final String threadNamePrefix = "schedule";
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern(threadNamePrefix).daemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
taskRegistrar.setScheduler(scheduledThreadPoolExecutor);
}
}

View File

@ -0,0 +1,20 @@
package com.genersoft.iot.vmp.conf;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class SchedulingConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.setThreadNamePrefix("scheduled-");
scheduler.setVirtualThreads(true); // 必须在 initialize() 之前
scheduler.initialize();
return scheduler;
}
}

View File

@ -20,22 +20,22 @@ public class SystemInfoTimerTask {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Scheduled(fixedRate = 2000) //每1秒执行一次
public void execute(){
try {
double cpuInfo = SystemInfoUtils.getCpuInfo();
redisCatchStorage.addCpuInfo(cpuInfo);
double memInfo = SystemInfoUtils.getMemInfo();
redisCatchStorage.addMemInfo(memInfo);
Map<String, Double> networkInterfaces = SystemInfoUtils.getNetworkInterfaces();
redisCatchStorage.addNetInfo(networkInterfaces);
List<Map<String, Object>> diskInfo =SystemInfoUtils.getDiskInfo();
redisCatchStorage.addDiskInfo(diskInfo);
} catch (InterruptedException e) {
log.error("[获取系统信息失败] {}", e.getMessage());
}
}
// @Scheduled(fixedRate = 2000) //每1秒执行一次
// public void execute(){
// try {
// double cpuInfo = SystemInfoUtils.getCpuInfo();
// redisCatchStorage.addCpuInfo(cpuInfo);
// double memInfo = SystemInfoUtils.getMemInfo();
// redisCatchStorage.addMemInfo(memInfo);
// Map<String, Double> networkInterfaces = SystemInfoUtils.getNetworkInterfaces();
// redisCatchStorage.addNetInfo(networkInterfaces);
// List<Map<String, Object>> diskInfo =SystemInfoUtils.getDiskInfo();
// redisCatchStorage.addDiskInfo(diskInfo);
// } catch (InterruptedException e) {
// log.error("[获取系统信息失败] {}", e.getMessage());
// }
//
// }
}

View File

@ -1,67 +0,0 @@
package com.genersoft.iot.vmp.conf;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* ThreadPoolTask 配置类
* @author lin
*/
@Configuration
@Order(1)
@EnableAsync(proxyTargetClass = true)
public class ThreadPoolTaskConfig {
public static final int cpuNum = Runtime.getRuntime().availableProcessors();
/**
* 默认情况下在创建了线程池后线程池中的线程数为0当有任务来之后就会创建一个线程去执行任务
* 当线程池中的线程数目达到corePoolSize后就会把到达的任务放到缓存队列当中
* 当队列满了就继续创建线程当线程数量大于等于maxPoolSize后开始使用拒绝策略拒绝
*/
/**
* 核心线程数默认线程数
*/
private static final int corePoolSize = Math.max(cpuNum * 2, 16);
/**
* 最大线程数
*/
private static final int maxPoolSize = corePoolSize * 10;
/**
* 允许线程空闲时间单位默认为秒
*/
private static final int keepAliveTime = 30;
/**
* 缓冲队列大小
*/
private static final int queueCapacity = 10000;
/**
* 线程池名前缀
*/
private static final String threadNamePrefix = "async-";
@Bean("taskExecutor") // bean的名称默认为首字母小写的方法名
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
// 线程池对拒绝任务的处理策略
// CallerRunsPolicy由调用线程提交任务的线程处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}

View File

@ -224,4 +224,11 @@ public class UserSetting {
*/
private boolean deviceIdStrict = true;
/**
* 对于识别为设备的国标设备的是否默认开启位置订阅
*/
private boolean subscribeMobilePosition = false;
}

View File

@ -11,11 +11,10 @@ import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@ -43,9 +42,8 @@ public class RedisRpcConfig implements MessageListener {
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private TaskExecutor taskExecutor;
private final static Map<String, RedisRpcClassHandler> protocolHash = new HashMap<>();

View File

@ -6,6 +6,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@ -43,4 +44,22 @@ public class RedisTemplateConfig {
return redisTemplate;
}
@Bean
public RedisTemplate<String, Long> redisLongTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Long> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
// Key 使用 String 序列化
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
// Value 使用 GenericToStringSerializer它能将 Long 转换为纯文本字符串存入 Redis
// 这样在 Redis 命令行输入 'get key' 看到的是 "123" 而不是二进制乱码
template.setValueSerializer(new GenericToStringSerializer<>(Long.class));
template.setHashValueSerializer(new GenericToStringSerializer<>(Long.class));
template.afterPropertiesSet();
return template;
}
}

View File

@ -89,15 +89,15 @@ public class Device {
/**
* 注册时间
*/
@Schema(description = "注册时间")
private String registerTime;
@Schema(description = "注册时间")
private Long registerTimeStamp;
/**
* 心跳时间
*/
@Schema(description = "心跳时间")
private String keepaliveTime;
private Long keepaliveTimeStamp;
/**
@ -216,4 +216,18 @@ public class Device {
public boolean checkWgs84() {
return geoCoordSys.equalsIgnoreCase("WGS84");
}
public Integer getHeartBeatCount() {
if (heartBeatCount == null) {
return 3;
}
return heartBeatCount;
}
public Integer getHeartBeatInterval() {
if (heartBeatCount == null) {
return 60;
}
return heartBeatInterval;
}
}

View File

@ -0,0 +1,20 @@
package com.genersoft.iot.vmp.gb28181.bean;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@Schema(description = "时间统计信息")
public class TimeStatistics {
@Schema(description = "时间")
private String time;
@Schema(description = "时间差")
private Long timeDiff;
}

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.bean.TimeStatistics;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
@ -21,8 +22,10 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.io.IOUtils;
import org.apache.ibatis.annotations.Options;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
@ -31,12 +34,11 @@ import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.List;
@Tag(name = "国标设备查询", description = "国标设备查询")
@SuppressWarnings("rawtypes")
@ -136,7 +138,7 @@ public class DeviceQuery {
log.debug("设备通道信息同步API调用deviceId" + deviceId);
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
if (device.getRegisterTime() == null) {
if (device.getTransport() == null) {
WVPResult<SyncStatus> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("设备尚未注册过");
@ -155,7 +157,7 @@ public class DeviceQuery {
log.debug("设备信息删除API调用deviceId" + deviceId);
}
// 清除redis记录
// 清除 redis 记录
deviceService.delete(deviceId);
JSONObject json = new JSONObject();
json.put("deviceId", deviceId);
@ -181,8 +183,7 @@ public class DeviceQuery {
DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId,channelId);
if (deviceChannel == null) {
PageInfo<DeviceChannel> deviceChannelPageResult = new PageInfo<>();
return deviceChannelPageResult;
return new PageInfo<>();
}
return deviceChannelService.getSubChannels(deviceChannel.getDataDeviceId(), channelId, query, channelType, online, page, count);
@ -431,6 +432,30 @@ public class DeviceQuery {
deviceService.subscribeMobilePosition(id, cycle, interval);
}
@GetMapping("/statistics/keepalive")
@Operation(summary = "请求心跳统计")
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "count", description = "返回的数量,按时间正向排序,返回的最新的", required = true)
public List<TimeStatistics> getKeepaliveTimeStatistics(String deviceId, Integer count) {
if (ObjectUtils.isEmpty(deviceId)) {
return List.of();
}
return deviceService.getKeepaliveTimeStatistics(deviceId, count);
}
@GetMapping("/statistics/register")
@Operation(summary = "请求注册统计")
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "count", description = "返回的数量,按时间正向排序,返回的最新的", required = true)
public List<TimeStatistics> getRegisterTimeStatistics(String deviceId, Integer count) {
if (ObjectUtils.isEmpty(deviceId)) {
return List.of();
}
return deviceService.getRegisterTimeStatistics(deviceId, count);
}
@GetMapping("/subscribe/alarm")
@Operation(summary = "开启/关闭报警订阅")
@Parameter(name = "id", description = "通道的Id", required = true)

View File

@ -591,6 +591,9 @@ public interface CommonGBChannelMapper {
@SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceId")
List<CommonGBChannel> queryOnlineListsByGbDeviceId(@Param("deviceId") int deviceId);
@SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceIds")
List<CommonGBChannel> queryOnlineListsByGbDeviceIds(List<Device> deviceList);
@SelectProvider(type = ChannelProvider.class, method = "queryCommonChannelByDeviceChannel")
CommonGBChannel queryCommonChannelByDeviceChannel(@Param("dataType") Integer dataType, @Param("dataDeviceId") Integer dataDeviceId, @Param("deviceId") String deviceId);

View File

@ -610,5 +610,11 @@ public interface DeviceChannelMapper {
@Update(value = {"UPDATE wvp_device_channel SET status = 'OFF' WHERE data_type = 1 and data_device_id=#{deviceId}"})
void offlineByDeviceId(@Param("deviceId") int deviceId);
@Update(value = {"<script>" +
"UPDATE wvp_device_channel SET status = 'OFF' WHERE data_type = 1 and data_device_id in " +
" <foreach item='item' index='index' collection='deviceList' open='(' separator=',' close=')'> #{item.id} </foreach>" +
" </script>"})
void offlineByDeviceIds(List<Device> deviceList);
}

View File

@ -30,8 +30,6 @@ public interface DeviceMapper {
"port," +
"host_address," +
"expires," +
"register_time," +
"keepalive_time," +
"create_time," +
"update_time," +
"charset," +
@ -65,8 +63,6 @@ public interface DeviceMapper {
"port," +
"host_address," +
"expires," +
"register_time," +
"keepalive_time," +
"heart_beat_interval," +
"heart_beat_count," +
"position_capability," +
@ -98,8 +94,6 @@ public interface DeviceMapper {
"#{port}," +
"#{hostAddress}," +
"#{expires}," +
"#{registerTime}," +
"#{keepaliveTime}," +
"#{heartBeatInterval}," +
"#{heartBeatCount}," +
"#{positionCapability}," +
@ -133,8 +127,6 @@ public interface DeviceMapper {
", port=#{port}" +
", host_address=#{hostAddress}" +
", on_line=#{onLine}" +
", register_time=#{registerTime}" +
", keepalive_time=#{keepaliveTime}" +
", heart_beat_interval=#{heartBeatInterval}" +
", position_capability=#{positionCapability}" +
", heart_beat_count=#{heartBeatCount}" +
@ -166,8 +158,6 @@ public interface DeviceMapper {
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
@ -208,8 +198,6 @@ public interface DeviceMapper {
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
@ -242,8 +230,6 @@ public interface DeviceMapper {
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
@ -277,8 +263,6 @@ public interface DeviceMapper {
"port,"+
"host_address,"+
"expires,"+
"register_time,"+
"keepalive_time,"+
"create_time,"+
"update_time,"+
"charset,"+
@ -356,8 +340,6 @@ public interface DeviceMapper {
",transport" +
",stream_mode" +
",on_line" +
",register_time" +
",keepalive_time" +
",ip" +
",create_time" +
",update_time" +
@ -444,8 +426,6 @@ public interface DeviceMapper {
", port=#{item.port}" +
", host_address=#{item.hostAddress}" +
", on_line=#{item.onLine}" +
", register_time=#{item.registerTime}" +
", keepalive_time=#{item.keepaliveTime}" +
", heart_beat_interval=#{item.heartBeatInterval}" +
", position_capability=#{item.positionCapability}" +
", heart_beat_count=#{item.heartBeatCount}" +
@ -460,7 +440,6 @@ public interface DeviceMapper {
"</script>"})
void batchUpdate(List<Device> devices);
@Select(value = {" <script>" +
"SELECT " +
"coalesce(custom_name, name) as name, " +
@ -472,8 +451,6 @@ public interface DeviceMapper {
",transport" +
",stream_mode" +
",on_line" +
",register_time" +
",keepalive_time" +
",ip" +
",create_time" +
",update_time" +

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.web.custom.bean.CameraGroup;
@ -601,6 +602,28 @@ public class ChannelProvider {
return sqlBuild.toString();
}
public String queryOnlineListsByGbDeviceIds(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL_TABLE_NAME);
sqlBuild.append(" where wdc.channel_type = 0 AND coalesce(wdc.gb_status, wdc.status) = 'ON' AND wdc.data_type = 1 ");
List<Device> deviceList = (List<Device>)params.get("deviceList");
if (deviceList != null && !deviceList.isEmpty()) {
sqlBuild.append(" AND data_device_id in (");
boolean first = true;
for (Device device : deviceList) {
if (!first) {
sqlBuild.append(",");
}
sqlBuild.append("'" + device.getId() + "'");
first = false;
}
sqlBuild.append(" )");
}
return sqlBuild.toString();
}
public String queryListForSy(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL_FOR_CAMERA_DEVICE);

View File

@ -1,12 +1,10 @@
package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent;
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -120,4 +118,9 @@ public class EventPublisher {
}
public void deviceOfflineEventPublish(Set<String> deviceIds) {
DeviceOfflineEvent event = new DeviceOfflineEvent(this);
event.setDeviceIds(deviceIds);
applicationEventPublisher.publishEvent(event);
}
}

View File

@ -0,0 +1,23 @@
package com.genersoft.iot.vmp.gb28181.event.device;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.io.Serial;
import java.util.Set;
@Getter
@Setter
public class DeviceOfflineEvent extends ApplicationEvent {
private Set<String> deviceIds;
@Serial
private static final long serialVersionUID = 1L;
public DeviceOfflineEvent(Object source) {
super(source);
}
}

View File

@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.gb28181.event.device;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.io.Serial;
@Getter
@Setter
public class DeviceOnlineEvent extends ApplicationEvent {
private Device device;
@Serial
private static final long serialVersionUID = 1L;
public DeviceOnlineEvent(Object source) {
super(source);
}
}

View File

@ -159,6 +159,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
List<CommonGBChannel> channelList = new ArrayList<>();
CommonGBChannel deviceChannel = channelMap.get(gbId);
channelList.add(deviceChannel);
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, channelList, subscribeInfo, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException |

View File

@ -21,13 +21,12 @@ public interface IDeviceService {
* 设备上线
* @param device 设备信息
*/
void online(Device device, SipTransactionInfo sipTransactionInfo);
void online(Device device);
/**
* 设备下线
* @param deviceId 设备编号
*/
void offline(String deviceId, String reason, boolean check);
void offline(Device device);
/**
* 添加目录订阅
@ -96,13 +95,6 @@ public interface IDeviceService {
List<Device> getAllByStatus(Boolean status);
/**
* 判断是否注册已经失效
* @param device 设备信息
* @return 布尔
*/
boolean expire(Device device);
/**
* 检查设备状态
* @param device 设备信息
@ -209,4 +201,7 @@ public interface IDeviceService {
void queryPreset(Device device, String channelId, ErrorCallback<List<Preset>> callback);
List<TimeStatistics> getKeepaliveTimeStatistics(String deviceId, Integer count);
List<TimeStatistics> getRegisterTimeStatistics(String deviceId, Integer count);
}

View File

@ -97,7 +97,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
/**
* 监听录像查询结束事件
*/
@Async("taskExecutor")
@Async
@org.springframework.context.event.EventListener
public void onApplicationEvent(RecordInfoEndEvent event) {
SynchronousQueue<RecordInfo> queue = topicSubscribers.get("record" + event.getRecordInfo().getSn());

View File

@ -12,13 +12,12 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusManager;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner;
@ -45,7 +44,9 @@ import jakarta.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -55,7 +56,6 @@ import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.text.ParseException;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
@ -123,7 +123,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
private SubscribeTaskRunner subscribeTaskRunner;
@Autowired
private DeviceStatusTaskRunner deviceStatusTaskRunner;
private DeviceStatusManager deviceStatusManager;
private Device getDeviceByDeviceIdFromDb(String deviceId) {
return deviceMapper.getDeviceByDeviceId(deviceId);
@ -132,10 +132,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public void run(String... args) throws Exception {
// 清理数据库不存在但是redis中存在的数据
// 清理数据库不存在但是 redis 中存在的数据
List<Device> devicesInDb = getAll();
if (devicesInDb.isEmpty()) {
redisCatchStorage.removeAllDevice();
deviceStatusManager.clear();
}else {
List<Device> devicesInRedis = redisCatchStorage.getAllDevices();
if (!devicesInRedis.isEmpty()) {
@ -152,30 +153,26 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
// 重置cseq计数
// 重置 cseq 计数
redisCatchStorage.resetAllCSEQ();
// 处理设备状态
List<DeviceStatusTaskInfo> allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo();
List<String> onlineDeviceIds = new ArrayList<>();
if (!allTaskInfo.isEmpty()) {
for (DeviceStatusTaskInfo taskInfo : allTaskInfo) {
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
if (device == null) {
deviceStatusTaskRunner.removeTask(taskInfo.getDeviceId());
continue;
}
// 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接
DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(),
taskInfo.getTransactionInfo(), taskInfo.getExpireTime() + 1000 + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(deviceStatusTask);
onlineDeviceIds.add(taskInfo.getDeviceId());
}
dbStatusCheck();
}
/**
* 数据库状态检查, 每6小时检查一次
*/
@Scheduled(fixedDelay = 6, initialDelay = 6, timeUnit = TimeUnit.HOURS)
public void dbStatusCheck(){
// 处理设备状态
Set<String> allDeviceIds = deviceStatusManager.getAll();
if (!allDeviceIds.isEmpty()) {
// 除了记录的设备以外 其他设备全部离线
List<Device> onlineDevice = getAllOnlineDevice(userSetting.getServerId());
if (!onlineDevice.isEmpty()) {
List<Device> offlineDevices = new ArrayList<>();
for (Device device : onlineDevice) {
if (!onlineDeviceIds.contains(device.getDeviceId())) {
if (!allDeviceIds.contains(device.getDeviceId())) {
// 此设备需要离线
device.setOnLine(false);
// 清理离线设备的相关缓存
@ -208,7 +205,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
continue;
}
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
if (device == null || !device.isOnLine() || !onlineDeviceIds.contains(taskInfo.getDeviceId())) {
if (device == null || !device.isOnLine() || !allDeviceIds.contains(taskInfo.getDeviceId())) {
subscribeTaskRunner.removeSubscribe(taskInfo.getKey());
continue;
}
@ -233,6 +230,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
}
}
private void offlineByIds(List<Device> offlineDevices) {
@ -240,10 +238,22 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
log.info("[更新多个离线设备信息] 参数为空");
return;
}
deviceMapper.offlineByList(offlineDevices);
int limitCount = 300;
if (offlineDevices.size() > limitCount) {
for (int i = 0; i < offlineDevices.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > offlineDevices.size()) {
toIndex = offlineDevices.size();
}
deviceMapper.offlineByList(offlineDevices.subList(i, toIndex));
}
}else {
deviceMapper.offlineByList(offlineDevices);
}
for (Device device : offlineDevices) {
device.setOnLine(false);
redisCatchStorage.updateDevice(device);
deviceStatusManager.remove(device.getDeviceId());
}
}
@ -254,9 +264,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
}
// 离线释放所有 ssrc
if (subscribeTaskRunner.containsKey(SubscribeTaskForAlarm.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForAlarm.getKey(device));
}
deviceStatusManager.remove(device.getDeviceId());
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
@ -286,13 +298,17 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[设备状态] 到期, 编号: {}", deviceId);
offline(deviceId, "保活到期", true);
// 监听设备过期事件
@Async
@EventListener
public void onApplicationEvent(DeviceOfflineEvent event) {
log.info("[设备状态] 到期, 编号: {}", event.getDeviceIds().toString());
List<Device> deviceList = redisCatchStorage.getDeviceList(event.getDeviceIds());
offline(deviceList);
}
@Override
public void online(Device device, SipTransactionInfo sipTransactionInfo) {
public void online(Device device) {
log.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
Device deviceInDb = getDeviceByDeviceIdFromDb(device.getDeviceId());
@ -303,20 +319,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
inviteStreamService.clearInviteInfo(device.getDeviceId());
}
device.setUpdateTime(now);
device.setKeepaliveTime(now);
if (device.getHeartBeatCount() == null) {
// 读取设备配置 获取心跳间隔和心跳超时次数 在次之前暂时设置为默认值
device.setHeartBeatCount(3);
device.setHeartBeatInterval(60);
device.setPositionCapability(0);
}
if (sipTransactionInfo != null) {
device.setSipTransactionInfo(sipTransactionInfo);
}else {
if (deviceInRedis != null) {
device.setSipTransactionInfo(deviceInRedis.getSipTransactionInfo());
}
}
// 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询
@ -336,6 +343,14 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
// 上线添加订阅
if (userSetting.isSubscribeMobilePosition() && isDevice(device.getDeviceId())) {
// 开启订阅
device.setSubscribeCycleForMobilePosition(60);
device.setMobilePositionSubmissionInterval(5);
addMobilePositionSubscribe(device, null);
}
sync(device);
}else {
device.setServerId(userSetting.getServerId());
@ -364,13 +379,20 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
addMobilePositionSubscribe(device, null);
}else{
if (userSetting.isSubscribeMobilePosition() && isDevice(device.getDeviceId())) {
// 开启订阅
device.setSubscribeCycleForMobilePosition(60);
device.setMobilePositionSubmissionInterval(5);
addMobilePositionSubscribe(device, null);
}
}
if (device.getSubscribeCycleForAlarm() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForAlarm.getKey(device))) {
addAlarmSubscribe(device, null);
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
// 发送 redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
}
}else {
@ -382,62 +404,69 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
sync(device);
}
}
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
if (sipTransactionInfo == null) {
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}else {
deviceStatusTaskRunner.removeTask(device.getDeviceId());
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task);
}
}else {
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task);
}
// 设备状态任务添加
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
@Override
@Transactional
public void offline(String deviceId, String reason, boolean check) {
Device device = getDeviceByDeviceIdFromDb(deviceId);
public void offline(Device device) {
if (device == null) {
log.warn("[设备不存在] device{}", deviceId);
log.warn("[设备不存在]");
return;
}
// 主动查询设备状态, 没有HostAddress无法发送请求可能是手动添加的设备
if (check && device.getHostAddress() != null) {
Boolean deviceStatus = getDeviceStatus(device);
if (deviceStatus != null && deviceStatus) {
log.info("[设备离线] 主动探测发现设备在线,暂不处理 device{}", deviceId);
online(device, null);
return;
}
}
log.info("[设备离线] {}, device{} 心跳间隔: {},心跳超时次数: {} 上次心跳时间:{} 上次注册时间: {}", reason, deviceId,
device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime());
String deviceId = device.getDeviceId();
log.info("[设备离线] device{} 心跳间隔: {},心跳超时次数: {}", deviceId, device.getHeartBeatInterval(), device.getHeartBeatCount());
device.setOnLine(false);
cleanOfflineDevice(device);
redisCatchStorage.updateDevice(device);
deviceMapper.update(device);
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
// 发送 redis 消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
if (isDevice(deviceId)) {
channelOfflineByDevice(device);
channelOfflineByDevice(List.of(device));
}
}
private void channelOfflineByDevice(Device device) {
public void offline(List<Device> deviceList) {
if (deviceList == null || deviceList.isEmpty()) {
log.warn("[设备不存在]");
return;
}
List<Device> realDeviceList = new ArrayList<>();
for (Device device : deviceList) {
if (device == null) {
continue;
}
log.info("[设备离线] device{} 心跳间隔: {},心跳超时次数: {}", device.getDeviceId(), device.getHeartBeatInterval(), device.getHeartBeatCount());
device.setOnLine(false);
cleanOfflineDevice(device);
if (isDevice(device.getDeviceId())) {
realDeviceList.add(device);
}
redisCatchStorage.updateDevice(device);
if (userSetting.getDeviceStatusNotify()) {
// 发送 redis 消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
}
deviceMapper.offlineByList(deviceList);
if (!realDeviceList.isEmpty()) {
channelOfflineByDevice(realDeviceList);
}
}
private void channelOfflineByDevice(List<Device> deviceList) {
// 进行通道离线
List<CommonGBChannel> channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceId(device.getId());
List<CommonGBChannel> channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceIds(deviceList);
if (channelList.isEmpty()) {
return;
}
deviceChannelMapper.offlineByDeviceId(device.getId());
deviceChannelMapper.offlineByDeviceIds(deviceList);
// 发送通道离线通知
eventPublisher.channelEventPublish(channelList, ChannelEvent.ChannelEventMessageType.OFF);
}
@ -478,25 +507,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
// 设备状态丢失检查
@Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public void lostCheckForStatus(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) {
return;
}
for (Device device : deviceList) {
if (device == null || !device.isOnLine() || !userSetting.getServerId().equals(device.getServerId())) {
continue;
}
if (!deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
log.debug("[状态丢失] 执行设备离线, 编号: {},", device.getDeviceId());
offline(device.getDeviceId(), "", true);
}
}
}
private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[目录订阅] 到期, 编号: {}", deviceId);
Device device = getDeviceByDeviceId(deviceId);
@ -791,13 +801,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
return deviceMapper.getDevices(ChannelDataType.GB28181, status);
}
@Override
public boolean expire(Device device) {
Instant registerTimeDate = Instant.from(DateUtil.formatter.parse(device.getRegisterTime()));
Instant expireInstant = registerTimeDate.plusMillis(TimeUnit.SECONDS.toMillis(device.getExpires()));
return expireInstant.isBefore(Instant.now());
}
@Override
public Boolean getDeviceStatus(@NotNull Device device) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
@ -921,8 +924,8 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForAlarm.getKey(device))) {
removeAlarmSubscribe(device, null);
}
if (deviceStatusTaskRunner.containsKey(deviceId)) {
deviceStatusTaskRunner.removeTask(deviceId);
if (deviceStatusManager.contains(deviceId)) {
deviceStatusManager.remove(deviceId);
}
List<CommonGBChannel> commonGBChannels = commonGBChannelMapper.queryByDataTypeAndDeviceIds(1, List.of(device.getId()));
@ -1101,9 +1104,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
updateDevice(deviceInDb);
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
}
@ -1332,9 +1333,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
try {
sipCommander.deviceStatusQuery(device, (code, msg, data) -> {
if ("ONLINE".equalsIgnoreCase(data.trim())) {
online(device, null);
online(device);
}else {
offline(device.getDeviceId(), "设备状态查询结果:" + data.trim(), true);
offline(device);
}
if (callback != null) {
callback.run(code, msg, data);
@ -1408,5 +1409,38 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
@Override
public List<TimeStatistics> getKeepaliveTimeStatistics(String deviceId, Integer count) {
List<Long> timeStampList = redisCatchStorage.getDeviceKeepaliveTimeStamp(deviceId, count);
return formateTimeStatistics(timeStampList, count);
}
@Override
public List<TimeStatistics> getRegisterTimeStatistics(String deviceId, Integer count) {
List<Long> timeStampList = redisCatchStorage.getDeviceRegisterTimeStamp(deviceId, count);
return formateTimeStatistics(timeStampList, count);
}
private List<TimeStatistics> formateTimeStatistics(List<Long> timeStampList, Integer count) {
if (timeStampList.isEmpty()) {
return List.of();
}
List<TimeStatistics> timeStatisticsList = new ArrayList<>();
for (int i = 0; i < timeStampList.size(); i++) {
Long timeStamp = timeStampList.get(i);
TimeStatistics timeStatistics = new TimeStatistics();
timeStatistics.setTime(DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(timeStamp));
if (i > 0) {
Long lastTimeStamp = timeStampList.get(i - 1);
timeStatistics.setTimeDiff((timeStamp - lastTimeStamp) / 1000);
}
timeStatisticsList.add(timeStatistics);
}
// 第一个数据由于没有上一个时间戳无法计算时间差去掉
timeStatisticsList.removeFirst();
if (timeStatisticsList.size() - 1 > count) {
timeStatisticsList = timeStatisticsList.subList(timeStatisticsList.size() - count, timeStatisticsList.size());
}
return timeStatisticsList;
}
}

View File

@ -47,7 +47,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
if ("rtsp".equals(event.getSchema()) && MediaApp.GB28181.equals(event.getApp())) {

View File

@ -123,7 +123,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
}
}
}else {
log.info("[Catalog事件: {}] 未找到上级平台: {}", event.getMessageType(), serverGbId);
log.info("[Catalog事件: {}] 没有需要通知的上级平台: {}", event.getMessageType(), serverGbId);
}
}
break;

View File

@ -303,7 +303,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryByStream(event.getStream());
@ -330,7 +330,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
/**
* 发流停止
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryByStream(event.getStream());

View File

@ -130,7 +130,7 @@ public class PlayServiceImpl implements IPlayService {
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp()) || MediaApp.GB28181_TALK.equals(event.getApp())) {
@ -174,7 +174,7 @@ public class PlayServiceImpl implements IPlayService {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
List<SendRtpInfo> sendRtpInfos = sendRtpServerService.queryByStream(event.getStream());
@ -243,7 +243,7 @@ public class PlayServiceImpl implements IPlayService {
/**
* 流未找到的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if (!MediaApp.GB28181.equals(event.getApp())) {

View File

@ -215,7 +215,8 @@ public class CatalogDataManager implements CommandLineRunner {
redisTemplate.delete(key);
}
@Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
//每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
private void timerTask(){
if (dataMap.isEmpty()) {
return;

View File

@ -0,0 +1,82 @@
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class DeviceStatusManager {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private UserSetting userSetting;
@Autowired
private EventPublisher eventPublisher;
private final String prefix = "VMP_DEVICE_EXPIRES";
public String redisKey(){
return String.format("%s_%s", prefix, userSetting.getServerId());
}
/**
* 状态过期检查, 每秒检查一次 系统启动10秒后开始检查
*/
@Scheduled(fixedDelay = 1, initialDelay = 10, timeUnit = TimeUnit.SECONDS)
public void expirationCheck(){
long now = System.currentTimeMillis();
// 获取已过期的 deviceId (Score 介于 0 现在之间)
Set<String> expiredIds = redisTemplate.opsForZSet().rangeByScore(redisKey(), 0, now);
if (expiredIds != null && !expiredIds.isEmpty()) {
redisTemplate.opsForZSet().remove(redisKey(), expiredIds.toArray());
// 使用 JDK 21 虚拟线程异步分发事件
Thread.startVirtualThread(() -> {
// 获取详情后删除缓存
// Device device = redisCatchStorage.getDevice(deviceId);
// redisCatchStorage.removeDevice(deviceId);
// 发送 Spring 异步事件
eventPublisher.deviceOfflineEventPublish(expiredIds);
});
}
}
public void add(String deviceId, long expireTime) {
redisTemplate.opsForZSet().add(redisKey(), deviceId, expireTime);
}
public void remove(String deviceId) {
redisTemplate.opsForZSet().remove(redisKey(), deviceId);
}
public boolean contains(String deviceId) {
if (ObjectUtils.isEmpty(deviceId)) {
return false;
}
return redisTemplate.opsForZSet().score(redisKey(), deviceId) != null;
}
public void clear() {
redisTemplate.opsForZSet().removeRangeByScore(redisKey(), 0, Long.MAX_VALUE);
}
public Set<String> getAll() {
return redisTemplate.opsForZSet().rangeByScore(redisKey(), 0, Long.MAX_VALUE);
}
}

View File

@ -1,60 +0,0 @@
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.common.DeviceStatusCallback;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Slf4j
@Data
public class DeviceStatusTask implements Delayed {
private String deviceId;
private SipTransactionInfo transactionInfo;
/**
* 超时时间(单位 毫秒)
*/
private long delayTime;
private DeviceStatusCallback callback;
public static DeviceStatusTask getInstance(String deviceId, SipTransactionInfo transactionInfo, long delayTime, DeviceStatusCallback callback) {
DeviceStatusTask deviceStatusTask = new DeviceStatusTask();
deviceStatusTask.setDeviceId(deviceId);
deviceStatusTask.setTransactionInfo(transactionInfo);
deviceStatusTask.setDelayTime(delayTime);
deviceStatusTask.setCallback(callback);
return deviceStatusTask;
}
public void expired() {
if (callback == null) {
log.info("[设备离线] 未找到过期处理回调, {}", deviceId);
return;
}
callback.run(deviceId, transactionInfo);
}
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
public DeviceStatusTaskInfo getInfo(){
DeviceStatusTaskInfo taskInfo = new DeviceStatusTaskInfo();
taskInfo.setTransactionInfo(transactionInfo);
taskInfo.setDeviceId(deviceId);
return taskInfo;
}
}

View File

@ -1,17 +0,0 @@
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import lombok.Data;
@Data
public class DeviceStatusTaskInfo{
private String deviceId;
private SipTransactionInfo transactionInfo;
/**
* 过期时间,单位毫秒
*/
private long expireTime;
}

View File

@ -1,131 +0,0 @@
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class DeviceStatusTaskRunner {
private final Map<String, DeviceStatusTask> subscribes = new ConcurrentHashMap<>();
private final DelayQueue<DeviceStatusTask> delayQueue = new DelayQueue<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private UserSetting userSetting;
private final String prefix = "VMP_DEVICE_STATUS";
// 状态过期检查
@Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS)
public void expirationCheck(){
while (!delayQueue.isEmpty()) {
DeviceStatusTask take = null;
try {
take = delayQueue.take();
try {
removeTask(take.getDeviceId());
take.expired();
}catch (Exception e) {
log.error("[设备状态到期] 到期处理时出现异常, 设备编号: {} ", take.getDeviceId());
}
} catch (InterruptedException e) {
log.error("[设备状态任务] ", e);
}
}
}
public void addTask(DeviceStatusTask task) {
Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000);
if (duration.getSeconds() < 0) {
return;
}
subscribes.put(task.getDeviceId(), task);
String key = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId());
redisTemplate.opsForValue().set(key, task.getInfo(), duration);
delayQueue.offer(task);
}
public boolean removeTask(String key) {
DeviceStatusTask task = subscribes.get(key);
if (task == null) {
return false;
}
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId());
redisTemplate.delete(redisKey);
subscribes.remove(key);
if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task);
if (!remove) {
log.info("[移除状态任务] 从延时队列内移除失败: {}", key);
}
}
return true;
}
public SipTransactionInfo getTransactionInfo(String key) {
DeviceStatusTask task = subscribes.get(key);
if (task == null) {
return null;
}
return task.getTransactionInfo();
}
public boolean updateDelay(String key, long expirationTime) {
DeviceStatusTask task = subscribes.get(key);
if (task == null) {
return false;
}
log.debug("[更新状态任务时间] 编号: {}", key);
// 如果值更改时间如果队列中有多个元素时 超时无法出发目前采用移除再加入的方法
delayQueue.remove(task);
task.setDelayTime(expirationTime);
delayQueue.offer(task);
String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId());
Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000);
redisTemplate.expire(redisKey, duration);
return true;
}
public boolean containsKey(String key) {
return subscribes.containsKey(key);
}
public List<DeviceStatusTaskInfo> getAllTaskInfo(){
String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId());
List<Object> values = RedisUtil.scan(redisTemplate, scanKey);
if (values.isEmpty()) {
return new ArrayList<>();
}
List<DeviceStatusTaskInfo> result = new ArrayList<>();
for (Object value : values) {
String redisKey = (String)value;
DeviceStatusTaskInfo taskInfo = (DeviceStatusTaskInfo)redisTemplate.opsForValue().get(redisKey);
if (taskInfo == null) {
continue;
}
Long expire = redisTemplate.getExpire(redisKey, TimeUnit.MILLISECONDS);
taskInfo.setExpireTime(expire);
result.add(taskInfo);
}
return result;
}
}

View File

@ -59,7 +59,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
* @param requestEvent RequestEvent事件
*/
@Override
@Async("taskExecutor")
@Async
public void processRequest(RequestEvent requestEvent) {
String method = requestEvent.getRequest().getMethod();
ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
@ -77,7 +77,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
* @param responseEvent responseEvent事件
*/
@Override
@Async("taskExecutor")
@Async
public void processResponse(ResponseEvent responseEvent) {
SIPResponse response = (SIPResponse)responseEvent.getResponse();
int status = response.getStatusCode();

View File

@ -78,7 +78,7 @@ public abstract class SIPRequestProcessorParent {
return responseAck(sipRequest, statusCode, null);
}
@Async("taskExecutor")
@Async
public void responseAckAsync(SIPRequest sipRequest, int statusCode) throws SipException, InvalidArgumentException, ParseException {
responseAck(sipRequest, statusCode, null);
}

View File

@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@ -69,6 +70,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
@Scheduled(fixedDelay = 400) //每400毫秒执行一次
@Async
public void executeTaskQueue(){
if (taskQueue.isEmpty()) {
return;

View File

@ -17,6 +17,7 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@ -61,6 +62,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
}
@Scheduled(fixedDelay = 200) //每200毫秒执行一次
@Async
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
@ -157,7 +159,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
continue;
}
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
mobilePosition.setReportSource("Mobile Position");

View File

@ -66,7 +66,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
@Override
public void process(RequestEvent evt) {
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
Element rootElement = getRootElement(evt);
if (rootElement == null) {
log.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest());

View File

@ -1,12 +1,12 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.RemoteAddressInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.GbCode;
import com.genersoft.iot.vmp.gb28181.bean.GbSipDate;
import com.genersoft.iot.vmp.common.RemoteAddressInfo;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@ -14,7 +14,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.IpPortUtil;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
@ -38,6 +38,7 @@ import javax.sip.message.Response;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.util.Calendar;
import java.util.List;
import java.util.Locale;
/**
@ -64,6 +65,10 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
@Autowired
private UserSetting userSetting;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@ -126,10 +131,10 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
String transport = reqViaHeader.getTransport();
device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP");
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), registerOkResponse);
device.setRegisterTime(DateUtil.getNow());
deviceService.online(device, null);
device.setRegisterTimeStamp(System.currentTimeMillis());
deviceService.online(device);
} else {
deviceService.offline(deviceId, "主动注销", false);
deviceService.offline(device);
}
return;
}else {
@ -137,6 +142,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
if (!ObjectUtils.isEmpty(device.getPassword()) || !ObjectUtils.isEmpty(sipConfig.getPassword())) {
password = (!ObjectUtils.isEmpty(device.getPassword())) ? device.getPassword() : sipConfig.getPassword();
}
// 如果设置了一个无密码的设备那么这里就会自动跳动后续会直接注册成功
}
}else {
if (ObjectUtils.isEmpty(sipConfig.getPassword())) {
@ -166,7 +172,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 注册失败
response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
response.setReasonPhrase("wrong password");
log.info(title + " 设备:{}, 密码/SIP服务器ID错误, 回复403: {}", deviceId, requestAddress);
log.info("{} 设备:{}, 密码/SIP服务器ID错误, 回复403: {}", title, deviceId, requestAddress);
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
return;
}
@ -175,7 +181,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
response = getMessageFactory().createResponse(Response.OK, request);
// 如果主动禁用了Date头则不添加
if (!userSetting.isDisableDateHeader()) {
// 添加date头
// 添加 date头
SIPDateHeader dateHeader = new SIPDateHeader();
// 使用自己修改的
GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
@ -188,9 +194,9 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
return;
}
// 添加Contact头
// 添加 Contact头
response.addHeader(request.getHeader(ContactHeader.NAME));
// 添加Expires头
// 添加 Expires头
response.addHeader(request.getExpires());
if (device == null) {
@ -224,7 +230,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 注册成功
device.setExpires(request.getExpires().getExpires());
registerFlag = true;
// 判断TCP还是UDP
// 判断 TCP/UDP
ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
String transport = reqViaHeader.getTransport();
device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP");
@ -232,16 +238,18 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
// 注册成功
// 保存到redis
device.setRegisterTimeStamp(System.currentTimeMillis());
// 保存到 redis
if (registerFlag) {
log.info("[注册成功] deviceId: {}->{}", deviceId, requestAddress);
device.setRegisterTime(DateUtil.getNow());
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse) response);
deviceService.online(device, sipTransactionInfo);
device.setSipTransactionInfo(sipTransactionInfo);
deviceService.online(device);
} else {
log.info("[注销成功] deviceId: {}->{}", deviceId, requestAddress);
deviceService.offline(deviceId, "主动注销", false);
deviceService.offline(device);
}
redisCatchStorage.updateDeviceRegisterTimeStamp(List.of(device));
} catch (SipException | NoSuchAlgorithmException | ParseException e) {
log.error("未处理的异常 ", e);
}
@ -268,4 +276,5 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
return response;
}
}

View File

@ -12,6 +12,7 @@ import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@ -74,9 +75,8 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
}
}
@Async
public void handMessageEvent(Element element, Object data) {
String cmd = getText(element, "CmdType");
String sn = getText(element, "SN");
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);

View File

@ -78,6 +78,12 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
log.error("[Alarm] 待处理消息队列已满 {}返回486 BUSY_HERE消息不做处理", userSetting.getMaxNotifyCountQueue());
return;
}
// 回复200 OK
try {
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 报警通知回复: {}", e.getMessage());
}
taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
}
@ -102,12 +108,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
continue;
}
RequestEvent evt = sipMsgInfo.getEvt();
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 报警通知回复: {}", e.getMessage());
}
try {
Device device = sipMsgInfo.getDevice();
Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");

View File

@ -1,18 +1,16 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.genersoft.iot.vmp.common.RemoteAddressInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SipMsgInfo;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusManager;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.IpPortUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
@ -27,9 +25,9 @@ import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 状态信息(心跳)报送
@ -41,7 +39,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
private final static String cmdType = "Keepalive";
private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
private final BlockingQueue<Device> taskQueue = new LinkedBlockingQueue<>();
@Autowired
private NotifyMessageHandler notifyMessageHandler;
@ -50,13 +48,13 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
private IDeviceService deviceService;
@Autowired
private DeviceStatusTaskRunner statusTaskRunner;
private DeviceStatusManager deviceStatusManager;
@Autowired
private UserSetting userSetting;
@Autowired
private DynamicTask dynamicTask;
private IRedisCatchStorage redisCatchStorage;
@Override
public void afterPropertiesSet() throws Exception {
@ -65,80 +63,56 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[心跳] 待处理消息队列已满 {}返回486 BUSY_HERE消息不做处理", userSetting.getMaxNotifyCountQueue());
return;
// 回复200 OK
try {
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 心跳回复: {}", e.getMessage());
}
taskQueue.add(device);
SIPRequest request = (SIPRequest) evt.getRequest();
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (device.getIp() == null || !device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}:{}", device.getName(), device.getDeviceId(), device.getIp(), device.getPort(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort());
device.setPort(remoteAddressInfo.getPort());
device.setHostAddress(IpPortUtil.concatenateIpAndPort(remoteAddressInfo.getIp(), String.valueOf(remoteAddressInfo.getPort())));
device.setIp(remoteAddressInfo.getIp());
device.setLocalIp(request.getLocalAddress().getHostAddress());
}
device.setKeepaliveTimeStamp(System.currentTimeMillis());
if (device.isOnLine()) {
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis());
} else {
if (userSetting.getGbDeviceOnline() == 1) {
// 对于已经离线的设备判断他的注册是否已经过期
deviceService.online(device);
}
}
taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
}
@Scheduled(fixedDelay = 100)
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
}
List<SipMsgInfo> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
SipMsgInfo poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
public void executeUpdateDeviceList() {
log.debug("[定时任务] 更新心跳记录,待处理设备数量: {}", taskQueue.size());
try {
if (!taskQueue.isEmpty()) {
redisCatchStorage.updateDeviceKeepaliveTimeStamp(taskQueue.stream().toList());
taskQueue.clear();
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<Device> deviceListForUpdate = new ArrayList<>();
for (SipMsgInfo sipMsgInfo : handlerCatchDataList) {
if (sipMsgInfo == null) {
continue;
}
RequestEvent evt = sipMsgInfo.getEvt();
// 回复200 OK
try {
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 心跳回复: {}", e.getMessage());
}
Device device = sipMsgInfo.getDevice();
SIPRequest request = (SIPRequest) evt.getRequest();
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (device.getIp() == null || !device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}", device.getName(), device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort(), request.getLocalAddress().getHostAddress());
device.setPort(remoteAddressInfo.getPort());
device.setHostAddress(IpPortUtil.concatenateIpAndPort(remoteAddressInfo.getIp(), String.valueOf(remoteAddressInfo.getPort())));
device.setIp(remoteAddressInfo.getIp());
device.setLocalIp(request.getLocalAddress().getHostAddress());
}
device.setKeepaliveTime(DateUtil.getNow());
if (device.isOnLine()) {
deviceListForUpdate.add(device);
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (statusTaskRunner.containsKey(device.getDeviceId())) {
statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
} else {
if (userSetting.getGbDeviceOnline() == 1) {
// 对于已经离线的设备判断他的注册是否已经过期
deviceService.online(device, null);
}
}
}
if (!deviceListForUpdate.isEmpty()) {
deviceService.updateDeviceList(deviceListForUpdate);
} catch (Exception e) {
log.error("[定时任务] 更新心跳记录 执行异常", e);
}
}
@Override
public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element element) {
// 个别平台保活不回复200OK会判定离线
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 心跳回复: {}", e.getMessage());
log.error("[命令发送失败] 报警通知回复: {}", e.getMessage());
}
}
}

View File

@ -9,13 +9,13 @@ import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@ -33,6 +33,7 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private final String cmdType = "MobilePosition";
@ -45,9 +46,8 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private TaskExecutor taskExecutor;
@Override
public void afterPropertiesSet() throws Exception {
@ -61,7 +61,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage());
}

View File

@ -16,7 +16,6 @@ import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@ -30,6 +29,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.lang.Thread;
/**
* 目录查询的回复
@ -67,136 +67,114 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
taskQueue.offer(new HandlerCatchData(evt, device, element));
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
}
}
@Scheduled(fixedDelay = 50)
@Transactional
public void executeTaskQueue(){
if (taskQueue.isEmpty()) {
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) {
continue;
}
RequestEvent evt = take.getEvt();
int sn = 0;
// 全局异常捕获保证下一条可以得到处理
int sn = 0;
// 全局异常捕获保证下一条可以得到处理
try {
Element rootElement = null;
try {
Element rootElement = null;
try {
rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
} catch (DocumentException e) {
log.error("[xml解析] 失败: ", e);
continue;
}
if (rootElement == null) {
log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
continue;
}
Element deviceListElement = rootElement.element("DeviceList");
Element sumNumElement = rootElement.element("SumNum");
Element snElement = rootElement.element("SN");
rootElement = getRootElement(evt, device.getCharset());
} catch (DocumentException e) {
log.error("[xml解析] 失败: ", e);
return;
}
if (rootElement == null) {
log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
return;
}
Element deviceListElement = rootElement.element("DeviceList");
Element sumNumElement = rootElement.element("SumNum");
Element snElement = rootElement.element("SN");
sn = Integer.parseInt(snElement.getText());
int sumNum = Integer.parseInt(sumNumElement.getText());
sn = Integer.parseInt(snElement.getText());
int sumNum = Integer.parseInt(sumNumElement.getText());
if (sumNum == 0) {
log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
// 数据已经完整接收
deviceChannelService.cleanChannelsForDevice(take.getDevice().getId());
// 推送空数据不然无法及时结束
catalogDataCatch.put(take.getDevice().getDeviceId(), sn, 0, take.getDevice(),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, null);
return;
} else {
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
List<DeviceChannel> channelList = new ArrayList<>();
List<Region> regionList = new ArrayList<>();
List<Group> groupList = new ArrayList<>();
// 遍历DeviceList
while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
// 总数减一 避免最后总数不对 无法确定问题
continue;
}
// 从xml解析内容到 DeviceChannel 对象
DeviceChannel channel = DeviceChannel.decode(itemDevice);
if (channel.getDeviceId() == null) {
log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
continue;
}
channel.setDataDeviceId(take.getDevice().getId());
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
channel.setParentId(null);
}
// 解析通道类型
if (channel.getDeviceId().length() <= 8) {
// 行政区划
Region region = Region.getInstance(channel);
regionList.add(region);
channel.setChannelType(1);
}else if (channel.getDeviceId().length() == 20){
// 业务分组/虚拟组织
Group group = Group.getInstance(channel);
if (group != null) {
channel.setParental(1);
channel.setChannelType(2);
groupList.add(group);
}
if (channel.getLongitude() != null && channel.getLatitude() != null && channel.getLongitude() > 0 && channel.getLatitude() > 0) {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude());
channel.setGbLongitude(wgs84Position[0]);
channel.setGbLatitude(wgs84Position[1]);
}
}
channelList.add(channel);
if (sumNum == 0) {
log.info("[收到通道]设备:{}的: 0个", device.getDeviceId());
// 数据已经完整接收
deviceChannelService.cleanChannelsForDevice(device.getId());
// 推送空数据不然无法及时结束
catalogDataCatch.put(device.getDeviceId(), sn, 0, device,
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), sn, null);
return;
} else {
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
List<DeviceChannel> channelList = new ArrayList<>();
List<Region> regionList = new ArrayList<>();
List<Group> groupList = new ArrayList<>();
// 遍历DeviceList
while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
// 总数减一 避免最后总数不对 无法确定问题
continue;
}
// 从xml解析内容到 DeviceChannel 对象
DeviceChannel channel = DeviceChannel.decode(itemDevice);
if (channel.getDeviceId() == null) {
log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
continue;
}
channel.setDataDeviceId(device.getId());
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
channel.setParentId(null);
}
// 解析通道类型
if (channel.getDeviceId().length() <= 8) {
// 行政区划
Region region = Region.getInstance(channel);
regionList.add(region);
channel.setChannelType(1);
}else if (channel.getDeviceId().length() == 20){
// 业务分组/虚拟组织
Group group = Group.getInstance(channel);
if (group != null) {
channel.setParental(1);
channel.setChannelType(2);
groupList.add(group);
}
if (channel.getLongitude() != null && channel.getLatitude() != null && channel.getLongitude() > 0 && channel.getLatitude() > 0) {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude());
channel.setGbLongitude(wgs84Position[0]);
channel.setGbLatitude(wgs84Position[1]);
}
}
channelList.add(channel);
}
catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(),
channelList, regionList, groupList);
log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.size(take.getDevice().getDeviceId(), sn), sumNum);
}
catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device,
channelList, regionList, groupList);
log.info("[收到通道]设备: {} -> {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.size(device.getDeviceId(), sn), sumNum);
}
} catch (Exception e) {
log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[收到通道] 异常内容: ", e);
} finally {
String deviceId = take.getDevice().getDeviceId();
if (catalogDataCatch.size(deviceId, sn) > 0
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
// 数据已经完整接收 此时可能存在某个设备离线变上线的情况但是考虑到性能此处不做处理
// 目前支持设备通道上线通知时和设备上线时向上级通知
boolean resetChannelsResult = saveData(take.getDevice(), sn);
}
} catch (Exception e) {
log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[收到通道] 异常内容: ", e);
} finally {
String deviceId = device.getDeviceId();
if (catalogDataCatch.size(deviceId, sn) > 0
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
// 数据已经完整接收 此时可能存在某个设备离线变上线的情况但是考虑到性能此处不做处理
// 目前支持设备通道上线通知时和设备上线时向上级通知
int finalSn = sn;
Thread.startVirtualThread(() -> {
boolean resetChannelsResult = saveData(device, finalSn);
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, sn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId(), sn).size() + "";
catalogDataCatch.setChannelSyncEnd(deviceId, sn, errorMsg);
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "";
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg);
} else {
catalogDataCatch.setChannelSyncEnd(deviceId, sn, null);
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null);
}
}
});
}
}
}

View File

@ -51,13 +51,7 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage());
}
Element onlineElement = element.element("Online");
JSONObject json = new JSONObject();
XmlUtil.node2Json(element, json);
if (log.isDebugEnabled()) {
log.debug(json.toJSONString());
}
String text = onlineElement.getText();
String text = element.elementText("Online");
responseMessageHandler.handMessageEvent(element, text);
}

View File

@ -5,11 +5,8 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
@ -20,10 +17,8 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

View File

@ -86,7 +86,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if (event.getApp().equals(talkApp) && event.getStream().endsWith("_talk")) {
@ -115,7 +115,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
@ -124,7 +124,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
/**
* 流未找到的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if (!userSetting.getAutoApplyPlay()) {

View File

@ -119,7 +119,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Async
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
@ -128,7 +128,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
@ -137,7 +137,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
/**
* 设备更新的通知
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(DeviceUpdateEvent event) {
JTDevice device = event.getDevice();
@ -163,7 +163,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
/**
* 位置更新的通知
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(JTPositionEvent event) {
if (event.getPhoneNumber() == null || event.getPositionInfo() == null

View File

@ -26,10 +26,8 @@ import jakarta.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
@ -81,10 +79,6 @@ public class ABLHttpHookListener {
@Autowired
private SSRCFactory ssrcFactory;
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;

View File

@ -59,7 +59,7 @@ public class ABLMediaServerStatusManger {
private final String type = "abl";
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaServerChangeEvent event) {
if (event.getMediaServerItemList() == null
@ -77,7 +77,7 @@ public class ABLMediaServerStatusManger {
execute();
}
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(HookAblServerStartEvent event) {
if (event.getMediaServerItem() == null
@ -93,7 +93,7 @@ public class ABLMediaServerStatusManger {
online(serverItem, null);
}
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(HookAblServerKeepaliveEvent event) {
if (event.getMediaServerItem() == null) {
@ -107,7 +107,7 @@ public class ABLMediaServerStatusManger {
online(serverItem, null);
}
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaServerDeleteEvent event) {
if (event.getMediaServer() == null) {

View File

@ -32,7 +32,7 @@ public class HookSubscribe {
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if (event.getSchema() == null || "rtsp".equals(event.getSchema())) {
@ -44,7 +44,7 @@ public class HookSubscribe {
/**
* 流结束事件
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
if (event.getSchema() == null || "rtsp".equals(event.getSchema())) {
@ -55,7 +55,7 @@ public class HookSubscribe {
/**
* 推流鉴权事件
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaPublishEvent event) {
sendNotify(HookType.on_publish, event);
@ -63,7 +63,7 @@ public class HookSubscribe {
/**
* 生成录像文件事件
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaRecordMp4Event event) {
sendNotify(HookType.on_record_mp4, event);

View File

@ -1,12 +1,14 @@
package com.genersoft.iot.vmp.media.event.mediaServer;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Getter
public class MediaServerChangeEvent extends ApplicationEvent {
public MediaServerChangeEvent(Object source) {
@ -15,10 +17,6 @@ public class MediaServerChangeEvent extends ApplicationEvent {
private List<MediaServer> mediaServerItemList;
public List<MediaServer> getMediaServerItemList() {
return mediaServerItemList;
}
public void setMediaServerItemList(List<MediaServer> mediaServerItemList) {
this.mediaServerItemList = mediaServerItemList;
}

View File

@ -22,14 +22,14 @@ public class MediaServerStatusEventListener {
@Autowired
private IPlayService playService;
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaServerOnlineEvent event) {
log.info("[媒体节点] 上线 ID" + event.getMediaServer().getId());
playService.zlmServerOnline(event.getMediaServer());
}
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaServerOfflineEvent event) {

View File

@ -32,6 +32,8 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import jakarta.validation.constraints.NotNull;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@ -86,7 +88,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Async
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("rtsp".equals(event.getSchema())) {
@ -100,7 +102,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
if ("rtsp".equals(event.getSchema())) {
@ -119,7 +121,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
/**
* 流媒体节点上线
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOnlineEvent event) {
@ -130,7 +132,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
/**
* 流媒体节点离线
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOfflineEvent event) {

View File

@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class ZLMMediaServerStatusManager {
private final Map<Object, MediaServer> offlineZlmPrimaryMap = new ConcurrentHashMap<>();
private final Map<Object, MediaServer> offlineZlmsecondaryMap = new ConcurrentHashMap<>();
private final Map<Object, Long> offlineZlmTimeMap = new ConcurrentHashMap<>();
@ -66,7 +67,7 @@ public class ZLMMediaServerStatusManager {
private final String type = "zlm";
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaServerChangeEvent event) {
if (event.getMediaServerItemList() == null
@ -80,11 +81,10 @@ public class ZLMMediaServerStatusManager {
log.info("[ZLM-添加待上线节点] ID{}", mediaServerItem.getId());
offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem);
offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis());
execute();
}
}
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(HookZlmServerStartEvent event) {
if (event.getMediaServer() == null
@ -96,7 +96,7 @@ public class ZLMMediaServerStatusManager {
online(event.getMediaServer(), event.getConfig());
}
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(HookZlmServerKeepaliveEvent event) {
if (event.getMediaServerItem() == null) {
@ -110,7 +110,7 @@ public class ZLMMediaServerStatusManager {
online(mediaServer, null);
}
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaServerDeleteEvent event) {
if (event.getMediaServer() == null) {
@ -183,6 +183,8 @@ public class ZLMMediaServerStatusManager {
MediaServer mediaServerInDb = mediaServerService.getOne(mediaServer.getId());
if (mediaServerInDb == null || !mediaServerInDb.isStatus()) {
log.info("[ZLM-连接成功] ID{}, 地址: {}:{}", mediaServer.getId(), mediaServer.getIp(), mediaServer.getHttpPort());
offlineZlmPrimaryMap.remove(mediaServer.getId());
offlineZlmsecondaryMap.remove(mediaServer.getId());
if (config == null) {
ZLMResult<List<JSONObject>> mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(mediaServer);
List<JSONObject> data = mediaServerConfig.getData();

View File

@ -116,7 +116,7 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
return new ArrayList<>(resultSet);
}
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaRecordMp4Event event) {
CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(event);

View File

@ -51,7 +51,7 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
// 流断开检查是否还处于录像状态 如果是则继续录像

View File

@ -54,7 +54,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Async
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
@ -63,7 +63,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {

View File

@ -3,12 +3,9 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@ -30,9 +27,6 @@ public class RedisPushStreamResponseListener implements MessageListener {
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private final Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();

View File

@ -62,7 +62,7 @@ public class RedisRpcDeviceController extends RpcController {
response.setBody("param error");
return response;
}
if (device.getRegisterTime() == null) {
if (device.getTransport() == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("设备尚未注册过");
return response;

View File

@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface IRedisCatchStorage {
@ -79,6 +80,11 @@ public interface IRedisCatchStorage {
*/
Device getDevice(String deviceId);
/**
* 获取Device
*/
List<Device> getDeviceList(Set<String> deviceIds);
void resetAllCSEQ();
void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo);
@ -178,5 +184,11 @@ public interface IRedisCatchStorage {
String chooseOneServer(String serverId);
void updateDeviceKeepaliveTimeStamp(List<Device> deviceList);
List<Long> getDeviceKeepaliveTimeStamp(String deviceId, Integer count);
void updateDeviceRegisterTimeStamp(List<Device> deviceList);
List<Long> getDeviceRegisterTimeStamp(String deviceId, Integer count);
}

View File

@ -20,8 +20,11 @@ import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.SystemInfoUtils;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.jspecify.annotations.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@ -46,6 +49,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private RedisTemplate<String, Long> longRedisTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@ -184,6 +190,17 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return device;
}
@Override
public List<Device> getDeviceList(Set<String> deviceIds) {
String key = VideoManagerConstants.DEVICE_PREFIX;
List<Device> deviceList = new ArrayList<>();
List<Object> objectList = redisTemplate.opsForHash().multiGet(key, Arrays.asList(deviceIds.toArray()));
for (Object object : objectList) {
deviceList.add((Device)object);
}
return deviceList;
}
@Override
public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) {
String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId();
@ -531,4 +548,84 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
return (String) range.iterator().next();
}
@Override
public void updateDeviceKeepaliveTimeStamp(List<Device> deviceList) {
if (deviceList == null || deviceList.isEmpty()) {
return;
}
// 使用 SessionCallback 保证批量操作在同一个连接中执行
SessionCallback<Boolean> sessionCallback = new SessionCallback<>() {
@Override
// 注意这里直接写死 String, String 覆盖接口的 K, V
public Boolean execute(@NonNull RedisOperations operations) {
// 1. 批量添加心跳数据到列表尾部
for (Device device : deviceList) {
Duration duration = Duration.ofHours(1);
String key = VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + device.getDeviceId();
operations.opsForList().rightPush(key, device.getKeepaliveTimeStamp());
// 2. 截取列表只保留最新 100
operations.opsForList().trim(key, -100, -1);
// 为整个列表 Key 设置过期时间核心覆盖式设置每次更新心跳都重置过期时间
operations.expire(key, duration);
}
return true;
}
};
longRedisTemplate.execute(sessionCallback);
}
@Override
public List<Long> getDeviceKeepaliveTimeStamp(String deviceId, Integer count) {
if (deviceId == null ) {
return List.of();
}
if (count == null) {
count = 20;
}
return longRedisTemplate.opsForList().range(VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + deviceId, -count - 1, -1);
}
@Override
public void updateDeviceRegisterTimeStamp(List<Device> deviceList) {
if (deviceList == null || deviceList.isEmpty()) {
return;
}
// 使用 SessionCallback 保证批量操作在同一个连接中执行
SessionCallback<Boolean> sessionCallback = new SessionCallback<>() {
@Override
// 注意这里直接写死 String, String 覆盖接口的 K, V
public Boolean execute(@NonNull RedisOperations operations) {
// 1. 批量添加心跳数据到列表尾部
for (Device device : deviceList) {
Duration duration = Duration.ofHours(3);
String key = VideoManagerConstants.DEVICE_REGISTER_PREFIX + device.getDeviceId();
operations.opsForList().rightPush(key, device.getRegisterTimeStamp());
// 2. 截取列表只保留最新 100
operations.opsForList().trim(key, -100, -1);
// 为整个列表 Key 设置过期时间核心覆盖式设置每次更新心跳都重置过期时间
operations.expire(key, duration);
}
return true;
}
};
longRedisTemplate.execute(sessionCallback);
}
@Override
public List<Long> getDeviceRegisterTimeStamp(String deviceId, Integer count) {
if (deviceId == null ) {
return List.of();
}
if (count == null) {
count = 20;
}
return longRedisTemplate.opsForList().range(VideoManagerConstants.DEVICE_REGISTER_PREFIX + deviceId, -count - 1, -1);
}
}

View File

@ -77,7 +77,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Async
@Transactional
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
@ -89,7 +89,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaDepartureEvent event) {
@ -101,7 +101,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
/**
* 流未找到的处理
*/
@Async("taskExecutor")
@Async
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if (MediaApp.isKeywords(event.getApp())) {
@ -119,7 +119,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
/**
* 流媒体节点上线
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOnlineEvent event) {
@ -129,7 +129,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
/**
* 流媒体节点离线
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOfflineEvent event) {

View File

@ -64,7 +64,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaArrivalEvent event) {
@ -119,7 +119,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
/**
* 流离开的处理
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaDepartureEvent event) {
@ -156,7 +156,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
/**
* 流媒体节点上线
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOnlineEvent event) {
@ -166,7 +166,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
/**
* 流媒体节点离线
*/
@Async("taskExecutor")
@Async
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOfflineEvent event) {

View File

@ -42,49 +42,4 @@ public class IpPortUtil {
throw new IllegalArgumentException("无效的IP地址: " + ip, e);
}
}
// 测试用例
public static void main(String[] args) {
// IPv4测试
String ipv4 = "192.168.1.1";
String port1 = "8080";
System.out.println(concatenateIpAndPort(ipv4, port1)); // 输出: 192.168.1.1:8080
// IPv6测试
String ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334";
String port2 = "80";
System.out.println(concatenateIpAndPort(ipv6, port2)); // 输出: [2001:0db8:85a3:0000:0000:8a2e:0370:7334]:80
// 压缩格式IPv6测试
String ipv6Compressed = "2001:db8::1";
System.out.println(concatenateIpAndPort(ipv6Compressed, port2)); // 输出: [2001:db8::1]:80
// 无效IP测试
try {
System.out.println(concatenateIpAndPort("invalid.ip", "1234"));
} catch (IllegalArgumentException e) {
System.out.println("捕获到预期异常: " + e.getMessage());
}
// 无效端口测试 - 非数字
try {
System.out.println(concatenateIpAndPort(ipv4, "abc"));
} catch (IllegalArgumentException e) {
System.out.println("捕获到预期异常: " + e.getMessage());
}
// 无效端口测试 - 超出范围
try {
System.out.println(concatenateIpAndPort(ipv4, "70000"));
} catch (IllegalArgumentException e) {
System.out.println("捕获到预期异常: " + e.getMessage());
}
// 空端口测试
try {
System.out.println(concatenateIpAndPort(ipv4, ""));
} catch (IllegalArgumentException e) {
System.out.println("捕获到预期异常: " + e.getMessage());
}
}
}
}

View File

@ -1,5 +1,8 @@
spring:
application:
name: wvp
threads:
virtual:
enabled: true
profiles:
active: dev

View File

@ -255,4 +255,24 @@ export function queryDeviceTree(params, deviceId) {
}
})
}
export function getKeepaliveTimeStatistics({ deviceId, count }) {
return request({
method: 'get',
url: '/api/device/query/statistics/keepalive',
params: {
deviceId: deviceId,
count: count
}
})
}
export function getRegisterTimeStatistics({ deviceId, count }) {
return request({
method: 'get',
url: '/api/device/query/statistics/register',
params: {
deviceId: deviceId,
count: count
}
})
}

View File

@ -2,7 +2,7 @@ import {
add,
changeChannelAudio,
deleteDevice,
deviceRecord,
deviceRecord, getKeepaliveTimeStatistics, getRegisterTimeStatistics,
queryBasicParam,
queryChannelOne,
queryChannels,
@ -252,6 +252,26 @@ const actions = {
reject(error)
})
})
},
getKeepaliveTimeStatistics({ commit }, params) {
return new Promise((resolve, reject) => {
getKeepaliveTimeStatistics(params).then(response => {
const { data } = response
resolve(data)
}).catch(error => {
reject(error)
})
})
},
getRegisterTimeStatistics({ commit }, params) {
return new Promise((resolve, reject) => {
getRegisterTimeStatistics(params).then(response => {
const { data } = response
resolve(data)
}).catch(error => {
reject(error)
})
})
}
}

View File

@ -1,6 +1,6 @@
@font-face {
font-family: "iconfont"; /* Project id 1291092 */
src: url('iconfont.woff2?t=1758784486763') format('woff2')
src: url('iconfont.woff2?t=1769409737891') format('woff2')
}
.iconfont {
@ -11,6 +11,18 @@
-moz-osx-font-smoothing: grayscale;
}
.icon-xintiao:before {
content: "\e7f4";
}
.icon-register:before {
content: "\e7f5";
}
.icon-tongji-Statistics:before {
content: "\e7f3";
}
.icon-mti-duobianxingxuan:before {
content: "\e9e7";
}

Binary file not shown.

View File

@ -64,3 +64,12 @@ div:focus {
.app-container {
padding: 20px;
}
.iconfont-14 {
font-family: "iconfont" !important;
font-size: 14px;
font-style: normal;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
margin-right: 5px;
}

View File

@ -110,8 +110,28 @@
/>
</template>
</el-table-column>
<el-table-column prop="keepaliveTime" label="最近心跳" min-width="140" />
<el-table-column prop="registerTime" label="最近注册" min-width="140" />
<el-table-column label="统计" min-width="160">
<template v-slot:default="scope">
<el-button
type="text"
size="mini"
:disabled="scope.row.online===0"
icon="iconfont-14 icon-xintiao"
title="心跳时间统计"
@click="getKeepaliveTimeStatistics(scope.row.deviceId)"
>心跳
</el-button>
<el-button
type="text"
size="mini"
:disabled="scope.row.online===0"
icon="iconfont-14 icon-register"
title="注册时间统计"
@click="getRegisterTimeStatistics(scope.row.deviceId)"
>注册
</el-button>
</template>
</el-table-column>
<el-table-column label="操作" min-width="300" fixed="right">
<template v-slot:default="scope">
<el-button
@ -169,6 +189,7 @@
<deviceEdit ref="deviceEdit" />
<syncChannelProgress ref="syncChannelProgress" />
<configInfo ref="configInfo" />
<timeStatistics ref="timeStatistics" />
</div>
</template>
@ -176,6 +197,7 @@
import deviceEdit from './edit.vue'
import syncChannelProgress from '../dialog/SyncChannelProgress.vue'
import configInfo from '../dialog/configInfo.vue'
import timeStatistics from './timeStatistics.vue'
import Vue from 'vue'
export default {
@ -183,7 +205,8 @@ export default {
components: {
configInfo,
deviceEdit,
syncChannelProgress
syncChannelProgress,
timeStatistics
},
data() {
return {
@ -463,6 +486,12 @@ export default {
message: error.message
})
})
},
getKeepaliveTimeStatistics: function(deviceId) {
this.$refs.timeStatistics.openDialog('心跳时间统计', 'device/getKeepaliveTimeStatistics', deviceId, 60)
},
getRegisterTimeStatistics: function(deviceId) {
this.$refs.timeStatistics.openDialog('注册时间统计', 'device/getRegisterTimeStatistics', deviceId, 10)
}
}
}

View File

@ -0,0 +1,162 @@
<template>
<div id="timeStatistics" v-loading="loading">
<el-dialog
v-el-drag-dialog
:title="title"
width="60%"
top="2rem"
:close-on-click-modal="false"
:visible.sync="showDialog"
:destroy-on-close="true"
@close="close"
>
<div style="margin-right: 20px;">
<el-row type="flex" justify="space-between" align="middle" style="margin-bottom: 12px;">
<div>
<el-button-group>
<el-button type="primary" :plain="viewMode !== 'table'" size="mini" @click="viewMode = 'table'">表格</el-button>
<el-button type="primary" :plain="viewMode !== 'chart'" size="mini" @click="viewMode = 'chart'">折线图</el-button>
</el-button-group>
<el-button icon="el-icon-refresh" size="mini" @click="fetchData" style="margin-left: 8px;">刷新</el-button>
</div>
<el-form :inline="true" size="mini">
<el-form-item label="数量">
<el-input-number v-model="count" :min="1" :max="500" @change="fetchData" />
</el-form-item>
</el-form>
</el-row>
<el-table
v-if="viewMode === 'table'"
:data="tableData"
border
stripe
size="mini"
height="400px"
style="width: 100%;"
>
<el-table-column prop="time" label="时间" min-width="180" />
<el-table-column prop="timeDiff" label="间隔(秒)" min-width="120" />
</el-table>
<ve-line
v-else
:data="chartData"
:extend="extend"
height="400px"
:legend-visible="false"
/>
</div>
<div style="margin-top: 12px; text-align: right;">
<span>最大波动{{ timeDiffDelta }} </span>
</div>
</el-dialog>
</div>
</template>
<script>
import moment from 'moment/moment'
import veLine from 'v-charts/lib/line'
import request from '@/utils/request'
import elDragDialog from '@/directive/el-drag-dialog'
export default {
name: 'TimeStatistics',
components: { veLine },
directives: { elDragDialog },
data() {
return {
title: null,
url: null,
deviceId: null,
count: 50,
showDialog: false,
loading: false,
viewMode: 'table',
list: [],
extend: {
grid: { right: '30px', containLabel: true },
xAxis: {
boundaryGap: false,
axisLabel: {
formatter: (v) => moment(v).format('HH:mm:ss')
}
},
yAxis: {
type: 'value',
min: 0,
splitNumber: 6,
axisLabel: { formatter: (v) => `${v}` }
},
tooltip: {
trigger: 'axis',
formatter: (data) => {
if (!data || !data.length) return ''
const [item] = data
return `${moment(item.data[0]).format('HH:mm:ss')}<br/>间隔:${item.data[1]}`
}
},
series: {
itemStyle: { color: '#409EFF' }
}
}
}
},
computed: {
chartData() {
return {
columns: ['time', 'timeDiff'],
rows: this.list
}
},
tableData() {
return this.list.slice().reverse();
},
timeDiffDelta() {
if (!this.list.length) return 0
const nums = this.list
.map(item => Number(item.timeDiff))
.filter(v => !Number.isNaN(v))
if (!nums.length) return 0
const max = Math.max(...nums)
const min = Math.min(...nums)
return (max - min).toFixed(2)
}
},
methods: {
openDialog(title, url, deviceId, count = 50) {
this.title = title
this.url = url
this.deviceId = deviceId
this.count = count
this.showDialog = true
this.viewMode = 'table'
this.fetchData()
},
fetchData() {
console.log(this.url)
if (!this.url || !this.deviceId) return
this.loading = true
this.$store.dispatch(this.url, {
deviceId: this.deviceId,
count: this.count
}).then(data => {
this.list = data
}).catch((error) => {
this.$message.error({
showClose: true,
message: error.message
})
})
},
close() {
this.title = null
this.url = null
this.deviceId = null
this.list = []
this.showDialog = false
this.loading = false
}
}
}
</script>

View File

@ -12,8 +12,6 @@ create table IF NOT EXISTS wvp_device
transport character varying(50) COMMENT '信令传输协议TCP/UDP',
stream_mode character varying(50) COMMENT '拉流方式(主动/被动)',
on_line bool default false COMMENT '在线状态',
register_time character varying(50) COMMENT '注册时间',
keepalive_time character varying(50) COMMENT '最近心跳时间',
ip character varying(50) COMMENT '设备IP地址',
create_time character varying(50) COMMENT '创建时间',
update_time character varying(50) COMMENT '更新时间',
@ -207,8 +205,7 @@ create table IF NOT EXISTS wvp_media_server
record_path character varying(255) COMMENT '录像目录',
record_day integer default 7 COMMENT '录像保留天数',
transcode_suffix character varying(255) COMMENT '转码指令后缀',
server_id character varying(50) COMMENT '对应信令服务器ID',
constraint uk_media_server_unique_ip_http_port unique (ip, http_port, server_id)
server_id character varying(50) COMMENT '对应信令服务器ID'
);
-- 上级国标平台注册信息

View File

@ -11,8 +11,6 @@ create table IF NOT EXISTS wvp_device
transport character varying(50),
stream_mode character varying(50),
on_line bool default false,
register_time character varying(50),
keepalive_time character varying(50),
ip character varying(50),
create_time character varying(50),
update_time character varying(50),
@ -49,8 +47,6 @@ COMMENT ON COLUMN wvp_device.firmware IS '固件版本号';
COMMENT ON COLUMN wvp_device.transport IS '信令传输协议TCP/UDP';
COMMENT ON COLUMN wvp_device.stream_mode IS '拉流方式(主动/被动)';
COMMENT ON COLUMN wvp_device.on_line IS '在线状态';
COMMENT ON COLUMN wvp_device.register_time IS '注册时间';
COMMENT ON COLUMN wvp_device.keepalive_time IS '最近心跳时间';
COMMENT ON COLUMN wvp_device.ip IS '设备IP地址';
COMMENT ON COLUMN wvp_device.create_time IS '创建时间';
COMMENT ON COLUMN wvp_device.update_time IS '更新时间';
@ -355,7 +351,6 @@ create table IF NOT EXISTS wvp_media_server
record_day integer default 7,
transcode_suffix character varying(255),
server_id character varying(50),
constraint uk_media_server_unique_ip_http_port unique (ip, http_port, server_id)
);
COMMENT ON TABLE wvp_media_server IS '媒体服务器(如 ZLM节点信息';
COMMENT ON COLUMN wvp_media_server.id IS '媒体服务器ID';

View File

@ -118,6 +118,29 @@ call wvp_20251027();
DROP PROCEDURE wvp_20251027;
DELIMITER ;
drop index uk_media_server_unique_ip_http_port on wvp_media_server;
/*
* 202601025
*/
DELIMITER // -- 重定义分隔符避免分号冲突
CREATE PROCEDURE `wvp_202601025`()
BEGIN
IF EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device' and column_name = 'register_time')
THEN
ALTER TABLE wvp_device DROP register_time;
END IF;
IF EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device' and column_name = 'keepalive_time')
THEN
ALTER TABLE wvp_device DROP keepalive_time;
END IF;
END; //
call wvp_202601025();
DROP PROCEDURE wvp_202601025;
DELIMITER ;

View File

@ -42,3 +42,8 @@ ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS enable_broadcast integer
ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS map_level integer default 0;
ALTER table wvp_common_group ADD COLUMN IF NOT EXISTS alias varchar(255) default null;
ALTER table wvp_stream_proxy DROP COLUMN IF EXISTS enable_remove_none_reader;
drop index uk_media_server_unique_ip_http_port on wvp_media_server;
ALTER table wvp_device DROP COLUMN IF EXISTS register_time;
ALTER table wvp_device DROP COLUMN IF EXISTS keepalive_time;