diff --git a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java index 21e4203e4..8869b3f45 100644 --- a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java +++ b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java @@ -5,9 +5,11 @@ import java.util.logging.LogManager; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; import tk.mybatis.spring.annotation.MapperScan; @SpringBootApplication +@EnableScheduling @MapperScan("com.genersoft.iot.vmp") public class VManageBootstrap extends LogManager { public static void main(String[] args) { diff --git a/src/main/java/com/genersoft/iot/vmp/common/DataCatch.java b/src/main/java/com/genersoft/iot/vmp/common/DataCatch.java new file mode 100644 index 000000000..ee2a3f091 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/DataCatch.java @@ -0,0 +1,105 @@ +package com.genersoft.iot.vmp.common; + +import com.genersoft.iot.vmp.gb28181.bean.RecordItem; + +import java.util.*; + +public class DataCatch { + + HashMap data = new HashMap<>(); + + private boolean job = false; + private DataCatch() { + } + + private static volatile DataCatch instance = null; + + public static DataCatch getInstance() { + if (instance == null) { + synchronized (DataCatch.class) { + if (instance == null) { + instance = new DataCatch(); + } + } + } + return instance; + } + + public void dataCheck() { + while (job) { + try { + for (String key : data.keySet()) { + Date now = new Date(); + DataCatchForRecordItemList dl = data.get(key); + if (now.getTime() - dl.getCreateTime().getTime() > dl.getTimeout()) { + data.remove(key); + if (data.size() == 0) job = false; + } + } + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static void setInstance(DataCatch instance) { + DataCatch.instance = instance; + } + + public boolean hasKey(String key) { + return data.containsKey(key); + } + + public void del(String key) { + data.remove(key); + if (data.size() == 0) job = false; + } + + public void set(String key, List recordItems, int timeout) { + DataCatchForRecordItemList dataCatchForRecordItemList = new DataCatchForRecordItemList(); + dataCatchForRecordItemList.setCreateTime(new Date()); + dataCatchForRecordItemList.setRecordItems(recordItems); + dataCatchForRecordItemList.setTimeout(timeout); + data.put(key, dataCatchForRecordItemList); + if (!job){ + // 启动定时任务 + job = true; + dataCheck(); + } + } + + public List get(String key) { + return data.get(key).getRecordItems(); + } + + class DataCatchForRecordItemList{ + private List recordItems; + private Date createTime; + private int timeout; + + public List getRecordItems() { + return recordItems; + } + + public void setRecordItems(List recordItems) { + this.recordItems = recordItems; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index 15b1053d8..d058ce5a0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -1,9 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; -import java.sql.Date; -import java.util.List; -import java.util.Map; +import java.util.Date; public class Device { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceOffLineDetector.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceOffLineDetector.java deleted file mode 100644 index f076e4b12..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceOffLineDetector.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; - -/** - * @Description:设备离在线状态检测器,用于检测设备状态 - * @author: swwheihei - * @date: 2020年5月13日 下午2:40:29 - */ -@Component -public class DeviceOffLineDetector { - - @Autowired - private RedisUtil redis; - - public boolean isOnline(String deviceId) { - String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + deviceId; - return redis.hasKey(key); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceStatusManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceStatusManager.java deleted file mode 100644 index c3d8d3c4f..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceStatusManager.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event; - -import com.genersoft.iot.vmp.gb28181.bean.Device; -import org.springframework.stereotype.Component; - -@Component -public class DeviceStatusManager { - - private Device -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceStatusManagerJob.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceStatusManagerJob.java new file mode 100644 index 000000000..7d6842a2a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceStatusManagerJob.java @@ -0,0 +1,53 @@ +package com.genersoft.iot.vmp.gb28181.event; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.storager.VideoManagerStoragerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; + +/** + * 设备超时离线检测 + */ +@Component +public class DeviceStatusManagerJob { + + private final Logger logger = LoggerFactory.getLogger(DeviceStatusManagerJob.class); + + @Autowired + private VideoManagerStoragerService storagerService; + + @Autowired + private EventPublisher publisher; + + @Value("${sip.deviceOfflineTimeout}") + private int timeout; + + //表示方法执行完成后5秒 + @Scheduled(fixedDelay = 1000) + public void checkOut() throws InterruptedException { + // 查询在线的设备 + List devices = storagerService.queryVideoDeviceList(null, "1"); + Date now = new Date(); + for (int i = 0; i < devices.size(); i++) { + // 超过 deviceOfflineTimeout 默认180, 设置离线 + Device device = devices.get(i); + if (now.getTime() - device.getLoginTime().getTime() > timeout) { + String deviceId = device.getDeviceId(); + storagerService.outline(deviceId); + logger.info(deviceId + "设备已经超时离线"); + // 发送设备离线事件 + publisher.outlineEventPublish(deviceId, VideoManagerConstants.EVENT_OUTLINE_TIMEOUT); + } + } + + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepliveTimeoutListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepliveTimeoutListener.java deleted file mode 100644 index 4b5e571ed..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepliveTimeoutListener.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.offline; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.stereotype.Component; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; - -/** - * @Description:设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件 - * @author: swwheihei - * @date: 2020年5月6日 上午11:35:46 - */ -@Component -public class KeepliveTimeoutListener extends KeyExpirationEventMessageListener { - - @Autowired - private EventPublisher publisher; - - public KeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) { - super(listenerContainer); - } - - /** - * 监听失效的key,key格式为keeplive_deviceId - * @param message - * @param pattern - */ - @Override - public void onMessage(Message message, byte[] pattern) { - // 获取失效的key - String expiredKey = message.toString(); - if(!expiredKey.startsWith(VideoManagerConstants.KEEPLIVEKEY_PREFIX)){ - System.out.println("收到redis过期监听,但开头不是"+VideoManagerConstants.KEEPLIVEKEY_PREFIX+",忽略"); - return; - } - - String deviceId = expiredKey.substring(VideoManagerConstants.KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); - publisher.outlineEventPublish(deviceId, VideoManagerConstants.EVENT_OUTLINE_TIMEOUT); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java index 01110db1f..7ae9bf296 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.event.offline; +import com.genersoft.iot.vmp.gb28181.event.online.OnlineEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -23,9 +24,6 @@ public class OfflineEventListener implements ApplicationListener { @Autowired private VideoManagerStoragerServiceImpl storager; - - @Autowired - private RedisUtil redis; @Override public void onApplicationEvent(OfflineEvent event) { @@ -34,23 +32,15 @@ public class OfflineEventListener implements ApplicationListener { logger.debug("设备离线事件触发,deviceId:" + event.getDeviceId() + ",from:" + event.getFrom()); } - String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + event.getDeviceId(); - - switch (event.getFrom()) { - // 心跳超时触发的离线事件,说明redis中已删除,无需处理 - case VideoManagerConstants.EVENT_OUTLINE_TIMEOUT: - break; - // 设备主动注销触发的离线事件,需要删除redis中的超时监听 - case VideoManagerConstants.EVENT_OUTLINE_UNREGISTER: - redis.del(key); - break; - default: - boolean exist = redis.hasKey(key); - if (exist) { - redis.del(key); - } - } - +// switch (event.getFrom()) { +// // 心跳超时触发的离线事件,说明redis中已删除,无需处理 +// case VideoManagerConstants.EVENT_OUTLINE_TIMEOUT: +// // 设备主动注销触发的离线事件,需要删除redis中的超时监听 +// case VideoManagerConstants.EVENT_OUTLINE_UNREGISTER: +// break; +// default: +// storager.outline(event.getDeviceId()); +// } // 处理离线监听 storager.outline(event.getDeviceId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index fd6bb07a4..1ef1cb2df 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.event.online; +import com.genersoft.iot.vmp.gb28181.bean.Device; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +10,9 @@ import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.storager.VideoManagerStoragerServiceImpl; +import java.util.Date; + + /** * @Description: 在线事件监听器,监听到离线后,修改设备离在线状态。 设备在线有两个来源: * 1、设备主动注销,发送注销指令,{@link com.genersoft.iot.vmp.gb28181.transmit.request.impl.RegisterRequestProcessor} @@ -23,9 +27,6 @@ public class OnlineEventListener implements ApplicationListener { @Autowired private VideoManagerStoragerServiceImpl storager; - - @Autowired - private RedisUtil redis; @Override public void onApplicationEvent(OnlineEvent event) { @@ -37,29 +38,25 @@ public class OnlineEventListener implements ApplicationListener { String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + event.getDeviceId(); boolean needUpdateStorager = false; + Device device = storager.queryVideoDevice(event.getDeviceId()); + if (device ==null) { + logger.error("在线事件未找到设备: " + event.getDeviceId()); + return; + } switch (event.getFrom()) { - // 注册时触发的在线事件,先在redis中增加超时超时监听 - case VideoManagerConstants.EVENT_ONLINE_REGISTER: - // TODO 超时时间暂时写死为180秒 - redis.set(key, event.getDeviceId(), 180); - needUpdateStorager = true; - break; - // 设备主动发送心跳触发的离线事件 - case VideoManagerConstants.EVENT_ONLINE_KEEPLIVE: - boolean exist = redis.hasKey(key); - // 先判断是否还存在,当设备先心跳超时后又发送心跳时,redis没有监听,需要增加 - if (!exist) { - needUpdateStorager = true; - redis.set(key, event.getDeviceId(), 180); - } else { - redis.expire(key, 180); - } - break; - } - - if (needUpdateStorager) { - // 处理离线监听 - storager.online(event.getDeviceId()); + // 注册时触发的在线事件,修数据库device在线, 并更新logginTime + case VideoManagerConstants.EVENT_ONLINE_REGISTER: + // 设备主动发送心跳触发的在线事件 + case VideoManagerConstants.EVENT_ONLINE_KEEPLIVE: + setDeviceOnline(device); + break; } + + } + + public void setDeviceOnline(Device device) { + device.setOnline(1); + device.setLoginTime(new Date()); + storager.updateDevice(device); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java index 209eae2cc..3a19331db 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java @@ -14,7 +14,6 @@ import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler; -import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -59,15 +58,10 @@ public class SIPProcessorFactory { @Autowired private SIPCommander cmder; - - @Autowired - private RedisUtil redis; - + @Autowired private DeferredResultHolder deferredResultHolder; - - @Autowired - private DeviceOffLineDetector offLineDetector; + @Autowired private InviteResponseProcessor inviteResponseProcessor; @@ -130,9 +124,7 @@ public class SIPProcessorFactory { processor.setTcpSipProvider(getTcpSipProvider()); processor.setUdpSipProvider(getUdpSipProvider()); processor.setPublisher(publisher); - processor.setRedis(redis); processor.setDeferredResultHolder(deferredResultHolder); - processor.setOffLineDetector(offLineDetector); processor.setCmder(cmder); processor.setStorager(storager); return processor; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index a8e1eae97..b6936fea5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -43,9 +43,8 @@ public class SIPRequestHeaderProvider { public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; - Host host = device.getHost(); // sipuri - SipURI requestURI = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), host.getAddress()); + SipURI requestURI = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), device.getAddress()); // via ArrayList viaHeaders = new ArrayList(); ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(sipConfig.getSipIp(), sipConfig.getSipPort(), @@ -78,13 +77,12 @@ public class SIPRequestHeaderProvider { public Request createInviteRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, String ssrc) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; - Host host = device.getHost(); //请求行 - SipURI requestLine = sipFactory.createAddressFactory().createSipURI(channelId, host.getAddress()); + SipURI requestLine = sipFactory.createAddressFactory().createSipURI(channelId, device.getAddress()); //via ArrayList viaHeaders = new ArrayList(); // ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(sipConfig.getSipIp(), sipConfig.getSipPort(), device.getTransport(), viaTag); - ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getHost().getIp(), device.getHost().getPort(), device.getTransport(), viaTag); + ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(), device.getTransport(), viaTag); viaHeader.setRPort(); viaHeaders.add(viaHeader); //from @@ -125,13 +123,12 @@ public class SIPRequestHeaderProvider { public Request createPlaybackInviteRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; - Host host = device.getHost(); //请求行 - SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), host.getAddress()); + SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), device.getAddress()); //via ArrayList viaHeaders = new ArrayList(); // ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(sipConfig.getSipIp(), sipConfig.getSipPort(), device.getTransport(), viaTag); - ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getHost().getIp(), device.getHost().getPort(), device.getTransport(), viaTag); + ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(), device.getTransport(), viaTag); viaHeader.setRPort(); viaHeaders.add(viaHeader); //from diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 9fab4a69c..6426b685b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -319,8 +319,8 @@ public class SIPCommander implements ISIPCommander { StreamInfo streamInfo = new StreamInfo(); streamInfo.setSsrc(ssrc); - streamInfo.setCahnnelId(channelId); - streamInfo.setDeviceID(device.getDeviceId()); + streamInfo.setChannelId(channelId); + streamInfo.setDeviceId(device.getDeviceId()); storager.startPlay(streamInfo); return streamInfo; } catch ( SipException | ParseException | InvalidArgumentException e) { @@ -386,8 +386,8 @@ public class SIPCommander implements ISIPCommander { StreamInfo streamInfo = new StreamInfo(); streamInfo.setSsrc(ssrc); - streamInfo.setCahnnelId(channelId); - streamInfo.setDeviceID(device.getDeviceId()); + streamInfo.setChannelId(channelId); + streamInfo.setDeviceId(device.getDeviceId()); boolean b = storager.startPlayback(streamInfo); return streamInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java index 9fa38f165..9da62a274 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java @@ -10,6 +10,7 @@ import javax.sip.SipException; import javax.sip.message.Request; import javax.sip.message.Response; +import com.genersoft.iot.vmp.common.DataCatch; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -22,7 +23,6 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; -import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -48,11 +48,8 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { private EventPublisher publisher; - private RedisUtil redis; - private DeferredResultHolder deferredResultHolder; - private DeviceOffLineDetector offLineDetector; private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; @@ -139,7 +136,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { deferredResultHolder.invokeResult(msg); // 回复200 OK responseAck(evt); - if (offLineDetector.isOnline(deviceId)) { + if (storager.isOnline(deviceId)) { publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { @@ -230,7 +227,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { deferredResultHolder.invokeResult(msg); // 回复200 OK responseAck(evt); - if (offLineDetector.isOnline(deviceId)) { + if (storager.isOnline(deviceId)) { publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); } } @@ -267,7 +264,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { cmder.catalogQuery(device); // 回复200 OK responseAck(evt); - if (offLineDetector.isOnline(deviceId)) { + if (storager.isOnline(deviceId)) { publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); } } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { @@ -287,7 +284,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { String deviceId = XmlUtil.getText(rootElement, "DeviceID"); // 回复200 OK responseAck(evt); - if (offLineDetector.isOnline(deviceId)) { + if (storager.isOnline(deviceId)) { publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); } else { } @@ -297,7 +294,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { } /*** - * 收到catalog设备目录列表请求 处理 TODO 过期时间暂时写死180秒,后续与DeferredResult超时时间保持一致 + * 收到catalog设备目录列表请求 处理 * * @param evt */ @@ -356,21 +353,21 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分 String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; // TODO 暂时直接操作redis存储,后续封装专用缓存接口,改为本地内存缓存 - if (redis.hasKey(cacheKey)) { - List previousList = (List) redis.get(cacheKey); + if (DataCatch.getInstance().hasKey(cacheKey)) { + List previousList = (List) DataCatch.getInstance().get(cacheKey); if (previousList != null && previousList.size() > 0) { recordList.addAll(previousList); } // 本分支表示录像列表被拆包,且加上之前的数据还是不够,保存缓存返回,等待下个包再处理 if (recordList.size() < recordInfo.getSumNum()) { logger.info("已获取" + recordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项"); - redis.set(cacheKey, recordList, 90); + DataCatch.getInstance().set(cacheKey, recordList, 90); return; } else { // 本分支表示录像被拆包,但加上之前的数据够足够,返回响应 // 因设备心跳有监听redis过期机制,为提高性能,此处手动删除 logger.info("录像数据已全部获取"); - redis.del(cacheKey); + DataCatch.getInstance().del(cacheKey); } } else { // 本分支有两种可能:1、录像列表被拆包,且是第一个包,直接保存缓存返回,等待下个包再处理 @@ -378,7 +375,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { logger.info("已获取" + recordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项"); logger.info("等待后续的包..."); - redis.set(cacheKey, recordList, 90); + DataCatch.getInstance().set(cacheKey, recordList, 90); return; } } @@ -457,16 +454,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { this.publisher = publisher; } - public void setRedis(RedisUtil redis) { - this.redis = redis; - } - public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { this.deferredResultHolder = deferredResultHolder; } - public void setOffLineDetector(DeviceOffLineDetector offLineDetector) { - this.offLineDetector = offLineDetector; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java index f93d6b4c0..9be7d1ea7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java @@ -45,13 +45,11 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor { private VideoManagerStoragerServiceImpl storager; private EventPublisher publisher; - + /*** - * 收到注册请求 处理 - * - * @param request - * 请求消息 - */ + * 收到注册请求 处理 + * @param evt + */ @Override public void process(RequestEvent evt) { try { @@ -103,18 +101,15 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor { received = viaHeader.getHost(); rPort = viaHeader.getPort(); } - // - Host host = new Host(); - host.setIp(received); - host.setPort(rPort); - host.setAddress(received.concat(":").concat(String.valueOf(rPort))); AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); String deviceId = uri.getUser(); device = new Device(); device.setStreamMode("UDP"); device.setDeviceId(deviceId); - device.setHost(host); + device.setIp(received); + device.setPort(rPort); + device.setAddress(received.concat(":").concat(String.valueOf(rPort))); // 注销成功 if (expiresHeader != null && expiresHeader.getExpires() == 0) { registerFlag = 2; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerMapper.java index 43d363849..ed6373f20 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerMapper.java @@ -88,8 +88,12 @@ public interface VideoManagerStoragerMapper { * * @return DShadow 设备对象 */ - @Select("SELECT de.*, (SELECT count(1) FROM channel WHERE deviceId == de.deviceId) as channelCount FROM device de") - public List queryVideoDevices(); + @Select("") + public List queryVideoDevices(String query, String online); /** diff --git a/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerService.java b/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerService.java index 29abe40c3..7b6191761 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerService.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerService.java @@ -109,14 +109,14 @@ public interface VideoManagerStoragerService { * * @return List 设备对象数组 */ - public PageInfo queryVideoDeviceList( int page, int count); + public PageInfo queryVideoDeviceList( String query, String online, int page, int count); /** * 获取多个设备 * * @return List 设备对象数组 */ - public List queryVideoDeviceList(); + public List queryVideoDeviceList(String query, String online); /** * 删除设备 @@ -184,4 +184,6 @@ public interface VideoManagerStoragerService { StreamInfo queryPlaybackByDevice(String deviceId, String channelId); StreamInfo queryPlaybackBySSRC(String ssrc); + + boolean isOnline(String deviceId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerServiceImpl.java index 7aa277ea8..5894ddb34 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerServiceImpl.java @@ -45,6 +45,11 @@ public class VideoManagerStoragerServiceImpl implements VideoManagerStoragerServ return storageMapper.queryVideoDevice(deviceId) != null; } + @Override + public boolean isOnline(String deviceId) { + return storageMapper.queryVideoDevice(deviceId).getOnline() == 1; + } + @Override public boolean create(Device device) { return storageMapper.addDevice(device) > 0; @@ -89,16 +94,16 @@ public class VideoManagerStoragerServiceImpl implements VideoManagerStoragerServ } @Override - public PageInfo queryVideoDeviceList(int page, int count) { + public PageInfo queryVideoDeviceList(String query, String online, int page, int count) { PageHelper.startPage(page,count); - PageInfo pageInfo = new PageInfo<>(storageMapper.queryVideoDevices()); + PageInfo pageInfo = new PageInfo<>(storageMapper.queryVideoDevices(query, online)); return pageInfo; } @Override - public List queryVideoDeviceList() { - return storageMapper.queryVideoDevices(); + public List queryVideoDeviceList(String query, String online) { + return storageMapper.queryVideoDevices(query, online); } @Override @@ -176,4 +181,8 @@ public class VideoManagerStoragerServiceImpl implements VideoManagerStoragerServ public StreamInfo queryPlaybackBySSRC(String ssrc) { return null; } + + + + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/device/DeviceController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/device/DeviceController.java index da82acc51..4c95cf829 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/device/DeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/device/DeviceController.java @@ -12,7 +12,6 @@ import org.springframework.web.context.request.async.DeferredResult; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.storager.VideoManagerStoragerServiceImpl; @@ -32,9 +31,7 @@ public class DeviceController { @Autowired private DeferredResultHolder resultHolder; - - @Autowired - private DeviceOffLineDetector offLineDetector; + @GetMapping("/devices/{deviceId}") public ResponseEntity devices(@PathVariable String deviceId){ @@ -54,7 +51,7 @@ public class DeviceController { logger.debug("查询所有视频设备API调用"); } - return storager.queryVideoDeviceList( page, count); + return storager.queryVideoDeviceList(null, null, page, count); } /** @@ -100,7 +97,7 @@ public class DeviceController { logger.debug("设备信息删除API调用,deviceId:" + deviceId); } - if (offLineDetector.isOnline(deviceId)) { + if (storager.isOnline(deviceId)) { return new ResponseEntity("不允许删除在线设备!", HttpStatus.NOT_ACCEPTABLE); } boolean isSuccess = storager.delete(deviceId); diff --git a/src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java b/src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java index 788b32be1..3ebe4ebc4 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java @@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.storager.VideoManagerStoragerServiceImpl; @@ -35,9 +34,6 @@ public class ApiDeviceController { @Autowired private DeferredResultHolder resultHolder; - @Autowired - private DeviceOffLineDetector offLineDetector; - /** * 分页获取设备列表 TODO 现在直接返回,尚未实现分页 * @param start @@ -60,10 +56,10 @@ public class ApiDeviceController { JSONObject result = new JSONObject(); List devices; if (start == null || limit ==null) { - devices = storager.queryVideoDeviceList(); + devices = storager.queryVideoDeviceList(null, null); result.put("DeviceCount", devices.size()); }else { - PageInfo deviceList = storager.queryVideoDeviceList(start/limit, limit); + PageInfo deviceList = storager.queryVideoDeviceList(null, null, start/limit, limit); result.put("DeviceCount", deviceList.getTotal()); devices = deviceList.getList(); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1c44d29b3..b2809c711 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -5,7 +5,8 @@ spring: communicate: http datasource: name: wvp - url: jdbc:sqlite::resource:wvp.db +# url: jdbc:sqlite::resource:wvp.db + url: jdbc:/home/lin/work/wvp-GB28181-pro/target/classes/wvp.db username: password: type: com.alibaba.druid.pool.DruidDataSource @@ -17,6 +18,7 @@ spring: # password: # type: com.alibaba.druid.pool.DruidDataSource # driver-class-name: com.mysql.jdbc.Driver +# 分页设置 pagehelper: helperDialect: sqlite supportMethodsArguments: true @@ -33,6 +35,8 @@ sip: id: 34020000002000000001 # 默认设备认证密码,后续扩展使用设备单独密码 password: 12345678 + # 默认设备在线超时时间(单位: 秒) + deviceOfflineTimeout: 180 auth: #32位小写md5加密(默认密码为admin) username: admin @@ -43,7 +47,7 @@ media: #zlm服务器的ip与http端口, 重点: 这是http端口 wanIp: port: 80 secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc - streamNoneReaderDelayMS: 1800000 # 无人观看多久自动关闭流 + streamNoneReaderDelayMS: 1800000 # 无人观看多久自动关闭流(单位: 毫秒) closeWaitRTPInfo: false # 强制关闭等待收到流编码信息后在返回, 设为true可以快速打开播放窗口, 设为false保证返回后流就可以播放 rtp: # 启用udp多端口模式 enable: true