timeSubscribes = new ConcurrentHashMap<>();
// @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次
+// @Scheduled(fixedRate= 100 * 60 * 60 )
@Scheduled(cron="0 0 * * * ?") //每小时执行一次, 每个整点
public void execute(){
logger.info("[定时任务] 清理过期的订阅信息");
@@ -58,11 +61,15 @@ public class SipSubscribe {
this.event = event;
if (event instanceof ResponseEvent) {
ResponseEvent responseEvent = (ResponseEvent)event;
- this.type = "response";
- this.msg = responseEvent.getResponse().getReasonPhrase();
- this.statusCode = responseEvent.getResponse().getStatusCode();
- this.callId = responseEvent.getDialog().getCallId().getCallId();
- this.dialog = responseEvent.getDialog();
+ Response response = responseEvent.getResponse();
+ this.dialog = responseEvent.getDialog();
+ this.type = "response";
+ if (response != null) {
+ this.msg = response.getReasonPhrase();
+ this.statusCode = response.getStatusCode();
+ }
+ this.callId = ((CallIdHeader)response.getHeader(CallIdHeader.NAME)).getCallId();
+
}else if (event instanceof TimeoutEvent) {
TimeoutEvent timeoutEvent = (TimeoutEvent)event;
this.type = "timeout";
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
index 6522387d9..00926574d 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
@@ -66,6 +66,7 @@ public class PlatformKeepaliveExpireEventLister implements ApplicationListener(msg.getData(),HttpStatus.OK));
+ result.setResult(ResponseEntity.ok().body(msg.getData()));
}
map.remove(msg.getKey());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
index 9f4137796..6e96dac31 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd;
+import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -121,6 +122,26 @@ public interface ISIPCommander {
void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent);
void streamByeCmd(String deviceId, String channelId);
+ /**
+ * 回放暂停
+ */
+ void playPauseCmd(Device device, StreamInfo streamInfo);
+
+ /**
+ * 回放恢复
+ */
+ void playResumeCmd(Device device, StreamInfo streamInfo);
+
+ /**
+ * 回放拖动播放
+ */
+ void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime);
+
+ /**
+ * 回放倍速播放
+ */
+ void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed);
+
/**
* 语音广播
*
@@ -235,8 +256,9 @@ public interface ISIPCommander {
* @param device 视频设备
* @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss
+ * @param sn
*/
- boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime);
+ boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, SipSubscribe.Event errorEvent);
/**
* 查询报警信息
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
index bb62902a3..98ea7c9c5 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import java.text.ParseException;
import java.util.ArrayList;
+import javax.sip.Dialog;
import javax.sip.InvalidArgumentException;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
@@ -11,6 +12,9 @@ import javax.sip.address.SipURI;
import javax.sip.header.*;
import javax.sip.message.Request;
+import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
+import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -30,6 +34,9 @@ public class SIPRequestHeaderProvider {
@Autowired
private SipFactory sipFactory;
+
+ @Autowired
+ private VideoStreamSessionManager streamSession;
public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
@@ -210,4 +217,50 @@ public class SIPRequestHeaderProvider {
request.setContent(content, contentTypeHeader);
return request;
}
+
+ public Request createInfoRequest(Device device, StreamInfo streamInfo, String content)
+ throws PeerUnavailableException, ParseException, InvalidArgumentException {
+ Request request = null;
+ Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId());
+
+ SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(),
+ device.getHostAddress());
+ // via
+ ArrayList viaHeaders = new ArrayList();
+ ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(),
+ device.getTransport(), null);
+ viaHeader.setRPort();
+ viaHeaders.add(viaHeader);
+ // from
+ SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(),
+ sipConfig.getDomain());
+ Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
+ FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, dialog.getLocalTag());
+ // to
+ SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(streamInfo.getChannelId(),
+ sipConfig.getDomain());
+ Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
+ ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, dialog.getRemoteTag());
+
+ // callid
+ CallIdHeader callIdHeader = dialog.getCallId();
+
+ // Forwards
+ MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
+
+ // ceq
+ CSeqHeader cSeqHeader = sipFactory.createHeaderFactory()
+ .createCSeqHeader(InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()), Request.INFO);
+
+ request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader,
+ fromHeader, toHeader, viaHeaders, maxForwards);
+ Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
+ .createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort()));
+ request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
+
+ ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application",
+ "MANSRTSP");
+ request.setContent(content, contentTypeHeader);
+ return request;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 7a0e901fe..2f90deed1 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -17,6 +18,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.SIPRequest;
@@ -1194,14 +1196,15 @@ public class SIPCommander implements ISIPCommander {
* @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss
*/
@Override
- public boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime) {
-
+ public boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, SipSubscribe.Event errorEvent) {
+
+
try {
StringBuffer recordInfoXml = new StringBuffer(200);
recordInfoXml.append("\r\n");
recordInfoXml.append("\r\n");
recordInfoXml.append("RecordInfo\r\n");
- recordInfoXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n");
+ recordInfoXml.append("" + sn + "\r\n");
recordInfoXml.append("" + channelId + "\r\n");
recordInfoXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(startTime) + "\r\n");
recordInfoXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(endTime) + "\r\n");
@@ -1218,7 +1221,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(),
"z9hG4bK-ViaRecordInfo-" + tm, "fromRec" + tm, null, callIdHeader);
- transmitRequest(device, request);
+ transmitRequest(device, request, errorEvent);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return false;
@@ -1486,7 +1489,7 @@ public class SIPCommander implements ISIPCommander {
StringBuffer cmdXml = new StringBuffer(200);
cmdXml.append("\r\n");
cmdXml.append("\r\n");
- cmdXml.append("CataLog\r\n");
+ cmdXml.append("Catalog\r\n");
cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n");
cmdXml.append("" + device.getDeviceId() + "\r\n");
cmdXml.append("\r\n");
@@ -1496,7 +1499,7 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
- Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "presence" , callIdHeader);
+ Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader);
transmitRequest(device, request, errorEvent, okEvent);
return true;
@@ -1543,4 +1546,111 @@ public class SIPCommander implements ISIPCommander {
clientTransaction.sendRequest();
return clientTransaction;
}
+
+ /**
+ * 回放暂停
+ */
+ @Override
+ public void playPauseCmd(Device device, StreamInfo streamInfo) {
+ try {
+
+ StringBuffer content = new StringBuffer(200);
+ content.append("PAUSE RTSP/1.0\r\n");
+ content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+ content.append("PauseTime: now\r\n");
+ Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+ logger.info(request.toString());
+ ClientTransaction clientTransaction = null;
+ if ("TCP".equals(device.getTransport())) {
+ clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+ } else if ("UDP".equals(device.getTransport())) {
+ clientTransaction = udpSipProvider.getNewClientTransaction(request);
+ }
+ if (clientTransaction != null) {
+ clientTransaction.sendRequest();
+ }
+
+ } catch (SipException | ParseException | InvalidArgumentException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 回放恢复
+ */
+ @Override
+ public void playResumeCmd(Device device, StreamInfo streamInfo) {
+ try {
+ StringBuffer content = new StringBuffer(200);
+ content.append("PLAY RTSP/1.0\r\n");
+ content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+ content.append("Range: npt=now-\r\n");
+ Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+ logger.info(request.toString());
+ ClientTransaction clientTransaction = null;
+ if ("TCP".equals(device.getTransport())) {
+ clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+ } else if ("UDP".equals(device.getTransport())) {
+ clientTransaction = udpSipProvider.getNewClientTransaction(request);
+ }
+
+ clientTransaction.sendRequest();
+
+ } catch (SipException | ParseException | InvalidArgumentException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 回放拖动播放
+ */
+ @Override
+ public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) {
+ try {
+ StringBuffer content = new StringBuffer(200);
+ content.append("PLAY RTSP/1.0\r\n");
+ content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+ content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n");
+
+ Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+ logger.info(request.toString());
+ ClientTransaction clientTransaction = null;
+ if ("TCP".equals(device.getTransport())) {
+ clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+ } else if ("UDP".equals(device.getTransport())) {
+ clientTransaction = udpSipProvider.getNewClientTransaction(request);
+ }
+
+ clientTransaction.sendRequest();
+
+ } catch (SipException | ParseException | InvalidArgumentException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 回放倍速播放
+ */
+ @Override
+ public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) {
+ try {
+ StringBuffer content = new StringBuffer(200);
+ content.append("PLAY RTSP/1.0\r\n");
+ content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+ content.append("Scale: " + String.format("%.1f",speed) + "\r\n");
+ Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+ logger.info(request.toString());
+ ClientTransaction clientTransaction = null;
+ if ("TCP".equals(device.getTransport())) {
+ clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+ } else if ("UDP".equals(device.getTransport())) {
+ clientTransaction = udpSipProvider.getNewClientTransaction(request);
+ }
+
+ clientTransaction.sendRequest();
+
+ } catch (SipException | ParseException | InvalidArgumentException e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 8363d6f33..127ef29a1 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -25,10 +25,8 @@ import javax.sip.header.ToHeader;
import java.util.HashMap;
import java.util.Map;
-/**
- * @description:ACK请求处理器
- * @author: swwheihei
- * @date: 2020年5月3日 下午5:31:45
+/**
+ * SIP命令类型: ACK请求
*/
@Component
public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index c97b55a21..feb44c540 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -28,15 +28,14 @@ import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
-/**
- * @description: BYE请求处理器
- * @author: lawrencehj
- * @date: 2021年3月9日
+/**
+ * SIP命令类型: BYE请求
*/
@Component
public class ByeRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
- private Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class);
+ private final Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class);
+ private final String method = "BYE";
@Autowired
private ISIPCommander cmder;
@@ -53,8 +52,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IMediaServerService mediaServerService;
- private String method = "BYE";
-
@Autowired
private SIPProcessorObserver sipProcessorObserver;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java
index 2e98e335f..0a818ee6e 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java
@@ -9,10 +9,8 @@ import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
-/**
- * @description:CANCEL请求处理器
- * @author: swwheihei
- * @date: 2020年5月3日 下午5:32:23
+/**
+ * SIP命令类型: CANCEL请求
*/
@Component
public class CancelRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 5eda75dda..1cb4af597 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -33,10 +33,8 @@ import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Vector;
-/**
- * @description:处理INVITE请求
- * @author: panll
- * @date: 2021年1月14日
+/**
+ * SIP命令类型: INVITE请求
*/
@SuppressWarnings("rawtypes")
@Component
@@ -140,12 +138,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 解析sdp消息, 使用jainsip 自带的sdp解析方式
String contentString = new String(request.getRawContent());
- // jainSip不支持y=字段, 移除移除以解析。
+ // jainSip不支持y=字段, 移除以解析。
int ssrcIndex = contentString.indexOf("y=");
- //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段
- String ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
- String substring = contentString.substring(0, contentString.indexOf("y="));
- SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
+ // 检查是否有y字段
+ String ssrcDefault = "0000000000";
+ String ssrc;
+ SessionDescription sdp;
+ if (ssrcIndex >= 0) {
+ //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段
+ ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
+ String substring = contentString.substring(0, contentString.indexOf("y="));
+ sdp = SdpFactory.getInstance().createSessionDescription(substring);
+ }else {
+ ssrc = ssrcDefault;
+ sdp = SdpFactory.getInstance().createSessionDescription(contentString);
+ }
// 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor1.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor1.java
index 847f7e18c..2a908414e 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor1.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor1.java
@@ -914,21 +914,20 @@ public class MessageRequestProcessor1 extends SIPRequestProcessorParent implemen
String uuid = UUID.randomUUID().toString().replace("-", "");
RecordInfo recordInfo = new RecordInfo();
Element rootElement = getRootElement(evt);
- Element deviceIdElement = rootElement.element("DeviceID");
- String channelId = deviceIdElement.getText().toString();
- String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + channelId;
+ String sn = getText(rootElement, "SN");
+ String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
if (device != null ) {
rootElement = getRootElement(evt, device.getCharset());
}
recordInfo.setDeviceId(deviceId);
- recordInfo.setChannelId(channelId);
+ recordInfo.setSn(sn);
recordInfo.setName(getText(rootElement, "Name"));
if (getText(rootElement, "SumNum")== null || getText(rootElement, "SumNum") =="") {
recordInfo.setSumNum(0);
} else {
recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum")));
}
- String sn = getText(rootElement, "SN");
+
Element recordListElement = rootElement.element("RecordList");
if (recordListElement == null || recordInfo.getSumNum() == 0) {
logger.info("无录像数据");
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index de97c1266..faa392413 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -35,9 +35,7 @@ import java.text.ParseException;
import java.util.Iterator;
/**
- * @description: Notify请求处理器
- * @author: lawrencehj
- * @date: 2021年1月27日
+ * SIP命令类型: NOTIFY请求
*/
@Component
public class NotifyRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
@@ -230,8 +228,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
Element rootElement = getRootElement(evt);
- Element deviceIdElement = rootElement.element("DeviceID");
- String channelId = deviceIdElement.getText();
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;
@@ -254,22 +250,23 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
continue;
}
Element eventElement = itemDevice.element("Event");
+ DeviceChannel channel = channelContentHander(itemDevice);
switch (eventElement.getText().toUpperCase()) {
case "ON" : // 上线
- logger.info("收到来自设备【{}】的通道上线【{}】通知", device.getDeviceId(), channelId);
- storager.deviceChannelOnline(deviceId, channelId);
+ logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());
+ storager.deviceChannelOnline(deviceId, channel.getChannelId());
// 回复200 OK
responseAck(evt, Response.OK);
break;
case "OFF" : // 离线
- logger.info("收到来自设备【{}】的通道离线【{}】通知", device.getDeviceId(), channelId);
- storager.deviceChannelOffline(deviceId, channelId);
+ logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId());
+ storager.deviceChannelOffline(deviceId, channel.getChannelId());
// 回复200 OK
responseAck(evt, Response.OK);
break;
case "VLOST" : // 视频丢失
- logger.info("收到来自设备【{}】的通道视频丢失【{}】通知", device.getDeviceId(), channelId);
- storager.deviceChannelOffline(deviceId, channelId);
+ logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId());
+ storager.deviceChannelOffline(deviceId, channel.getChannelId());
// 回复200 OK
responseAck(evt, Response.OK);
break;
@@ -278,19 +275,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
responseAck(evt, Response.OK);
break;
case "ADD" : // 增加
- logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channelId);
- DeviceChannel deviceChannel = channelContentHander(itemDevice, channelId);
- storager.updateChannel(deviceId, deviceChannel);
+ logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId());
+ storager.updateChannel(deviceId, channel);
responseAck(evt, Response.OK);
break;
case "DEL" : // 删除
- logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channelId);
- storager.delChannel(deviceId, channelId);
+ logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId());
+ storager.delChannel(deviceId, channel.getChannelId());
responseAck(evt, Response.OK);
break;
case "UPDATE" : // 更新
- logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channelId);
- DeviceChannel channel = channelContentHander(itemDevice, channelId);
+ logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId());
storager.updateChannel(deviceId, channel);
responseAck(evt, Response.OK);
break;
@@ -316,13 +311,15 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
}
}
- public DeviceChannel channelContentHander(Element itemDevice, String channelId){
+ public DeviceChannel channelContentHander(Element itemDevice){
Element channdelNameElement = itemDevice.element("Name");
String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : "";
Element statusElement = itemDevice.element("Status");
String status = statusElement != null ? statusElement.getTextTrim().toString() : "ON";
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setName(channelName);
+ Element channdelIdElement = itemDevice.element("DeviceID");
+ String channelId = channdelIdElement != null ? channdelIdElement.getTextTrim().toString() : "";
deviceChannel.setChannelId(channelId);
// ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理
if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
index 10e99cb7d..3f14e23e8 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
@@ -35,10 +35,8 @@ import java.text.ParseException;
import java.util.Calendar;
import java.util.Locale;
-/**
- * @description:收到注册请求 处理
- * @author: swwheihei
- * @date: 2020年5月3日 下午4:47:25
+/**
+ * SIP命令类型: REGISTER请求
*/
@Component
public class RegisterRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
index ba835dbe9..be4b2ce99 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -18,10 +18,8 @@ import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
-/**
- * @description:SUBSCRIBE请求处理器
- * @author: swwheihei
- * @date: 2020年5月3日 下午5:31:20
+/**
+ * SIP命令类型: SUBSCRIBE请求
*/
@Component
public class SubscribeRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java
index efc8259cf..afaa7cb39 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java
@@ -14,10 +14,7 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent implements IMessageHandler{
- public static Map messageHandlerMap = new ConcurrentHashMap<>();
-
- @Autowired
- public MessageRequestProcessor messageRequestProcessor;
+ public Map messageHandlerMap = new ConcurrentHashMap<>();
public void addHandler(String cmdType, IMessageHandler messageHandler) {
messageHandlerMap.put(cmdType, messageHandler);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/ControlMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/ControlMessageHandler.java
index b533082a0..235a47705 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/ControlMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/ControlMessageHandler.java
@@ -6,6 +6,12 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+/**
+ * 命令类型: 控制命令
+ * 命令类型: 设备控制: 远程启动, 录像控制(TODO), 报警布防/撤防命令(TODO), 报警复位命令(TODO),
+ * 强制关键帧命令(TODO), 拉框放大/缩小控制命令(TODO), 看守位控制(TODO), 报警复位(TODO)
+ * 命令类型: 设备配置: SVAC编码配置(TODO), 音频参数(TODO), SVAC解码配置(TODO)
+ */
@Component
public class ControlMessageHandler extends MessageHandlerAbstract implements InitializingBean {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceControlQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java
similarity index 95%
rename from src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceControlQueryMessageHandler.java
rename to src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java
index 1c4a166b2..980ec5d79 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceControlQueryMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java
@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.cmd;
+package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.control.cmd;
import com.genersoft.iot.vmp.VManageBootstrap;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.control.ControlMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.QueryMessageHandler;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
@@ -37,7 +38,7 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
private final String cmdType = "DeviceControl";
@Autowired
- private QueryMessageHandler queryMessageHandler;
+ private ControlMessageHandler controlMessageHandler;
@Autowired
private IVideoManagerStorager storager;
@@ -50,7 +51,7 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
@Override
public void afterPropertiesSet() throws Exception {
- queryMessageHandler.addHandler(cmdType, this);
+ controlMessageHandler.addHandler(cmdType, this);
}
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java
index 56c020bec..c546057ee 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java
@@ -1,14 +1,23 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+/**
+ * 命令类型: 通知命令
+ * 命令类型: 状态信息(心跳)报送, 报警通知, 媒体通知, 移动设备位置数据,语音广播通知(TODO), 设备预置位(TODO)
+ */
@Component
public class NotifyMessageHandler extends MessageHandlerAbstract implements InitializingBean {
private final String messageType = "Notify";
+ @Autowired
+ private MessageRequestProcessor messageRequestProcessor;
+
@Override
public void afterPropertiesSet() throws Exception {
messageRequestProcessor.addHandler(messageType, this);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/CatalogNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/CatalogNotifyMessageHandler.java
index 497f421b5..c6c1ab9c5 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/CatalogNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/CatalogNotifyMessageHandler.java
@@ -46,9 +46,6 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
@Autowired
private SipConfig config;
- @Autowired
- private EventPublisher publisher;
-
@Override
public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/QueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/QueryMessageHandler.java
index ab111b550..9a29955c9 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/QueryMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/QueryMessageHandler.java
@@ -6,6 +6,10 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+/**
+ * 命令类型: 查询指令
+ * 命令类型: 设备状态, 设备目录信息, 设备信息, 文件目录检索(TODO), 报警(TODO), 设备配置(TODO), 设备预置位(TODO), 移动设备位置数据(TODO)
+ */
@Component
public class QueryMessageHandler extends MessageHandlerAbstract implements InitializingBean {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/AlarmQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/AlarmQueryMessageHandler.java
new file mode 100644
index 000000000..60cf4d073
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/AlarmQueryMessageHandler.java
@@ -0,0 +1,77 @@
+package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.cmd;
+
+import com.genersoft.iot.vmp.conf.SipConfig;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.QueryMessageHandler;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
+import org.dom4j.Element;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.RequestEvent;
+import javax.sip.SipException;
+import javax.sip.header.FromHeader;
+import javax.sip.message.Response;
+import java.text.ParseException;
+import java.util.List;
+
+@Component
+public class AlarmQueryMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
+
+ private Logger logger = LoggerFactory.getLogger(AlarmQueryMessageHandler.class);
+ private final String cmdType = "Alarm";
+
+ @Autowired
+ private QueryMessageHandler queryMessageHandler;
+
+ @Autowired
+ private IVideoManagerStorager storager;
+
+ @Autowired
+ private SIPCommanderFroPlatform cmderFroPlatform;
+
+ @Autowired
+ private SipConfig config;
+
+ @Autowired
+ private EventPublisher publisher;
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ queryMessageHandler.addHandler(cmdType, this);
+ }
+
+ @Override
+ public void handForDevice(RequestEvent evt, Device device, Element element) {
+
+ }
+
+ @Override
+ public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element rootElement) {
+
+ logger.info("不支持alarm查询");
+ try {
+ responseAck(evt, Response.NOT_FOUND, "not support alarm query");
+ } catch (SipException e) {
+ e.printStackTrace();
+ } catch (InvalidArgumentException e) {
+ e.printStackTrace();
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java
index 13c8ac760..18da9cde9 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java
@@ -6,6 +6,10 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+/**
+ * 命令类型: 请求动作的应答
+ * 命令类型: 设备控制, 报警通知, 设备目录信息查询, 目录信息查询, 目录收到, 设备信息查询, 设备状态信息查询 ......
+ */
@Component
public class ResponseMessageHandler extends MessageHandlerAbstract implements InitializingBean {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
index f1919da4d..f0f842138 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -64,18 +64,16 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
rootElement = getRootElement(evt, device.getCharset());
String uuid = UUID.randomUUID().toString().replace("-", "");
RecordInfo recordInfo = new RecordInfo();
- Element deviceIdElement = rootElement.element("DeviceID");
- String channelId = deviceIdElement.getText();
- String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + device.getDeviceId() + channelId;
+ String sn = getText(rootElement, "SN");
+ String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + device.getDeviceId() + sn;
recordInfo.setDeviceId(device.getDeviceId());
- recordInfo.setChannelId(channelId);
+ recordInfo.setSn(sn);
recordInfo.setName(getText(rootElement, "Name"));
if (getText(rootElement, "SumNum") == null || getText(rootElement, "SumNum") == "") {
recordInfo.setSumNum(0);
} else {
recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum")));
}
- String sn = getText(rootElement, "SN");
Element recordListElement = rootElement.element("RecordList");
if (recordListElement == null || recordInfo.getSumNum() == 0) {
logger.info("无录像数据");
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java
index d44c1a9d1..64933b80b 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java
@@ -42,7 +42,6 @@ public class ByeResponseProcessor extends SIPResponseProcessorAbstract {
@Override
public void process(ResponseEvent evt) {
// TODO Auto-generated method stub
- System.out.println("收到bye");
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index f74dcfa76..233416a8d 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -3,13 +3,18 @@ package com.genersoft.iot.vmp.media.zlm;
import java.util.List;
import java.util.UUID;
+import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IMediaService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
@@ -56,6 +61,12 @@ public class ZLMHttpHookListener {
@Autowired
private IMediaServerService mediaServerService;
+ @Autowired
+ private IStreamProxyService streamProxyService;
+
+ @Autowired
+ private IMediaService mediaService;
+
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@@ -153,12 +164,20 @@ public class ZLMHttpHookListener {
subscribe.response(mediaInfo, json);
}
}
+ String app = json.getString("app");
+ String stream = json.getString("stream");
+ StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream);
JSONObject ret = new JSONObject();
+ // 录像回放时不进行录像下载
+ if (streamInfo != null) {
+ ret.put("enableMP4", false);
+ }else {
+ ret.put("enableMP4", userSetup.isRecordPushLive());
+ }
ret.put("code", 0);
ret.put("msg", "success");
ret.put("enableHls", true);
ret.put("enableMP4", userSetup.isRecordPushLive());
- ret.put("enableRtxp", true);
return new ResponseEntity(ret.toString(), HttpStatus.OK);
}
@@ -254,12 +273,13 @@ public class ZLMHttpHookListener {
*/
@ResponseBody
@PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
- public ResponseEntity onStreamChanged(@RequestBody JSONObject json){
+ public ResponseEntity onStreamChanged(@RequestBody MediaItem item){
if (logger.isDebugEnabled()) {
- logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + json.toString());
+ logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
}
- String mediaServerId = json.getString("mediaServerId");
+ String mediaServerId = item.getMediaServerId();
+ JSONObject json = (JSONObject) JSON.toJSON(item);
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
@@ -268,13 +288,12 @@ public class ZLMHttpHookListener {
}
}
-
// 流消失移除redis play
- String app = json.getString("app");
- String streamId = json.getString("stream");
- String schema = json.getString("schema");
- JSONArray tracks = json.getJSONArray("tracks");
- boolean regist = json.getBoolean("regist");
+ String app = item.getApp();
+ String streamId = item.getStream();
+ String schema = item.getSchema();
+ List tracks = item.getTracks();
+ boolean regist = item.isRegist();
if (tracks != null) {
logger.info("[stream: " + streamId + "] on_stream_changed->>" + schema);
}
@@ -294,12 +313,34 @@ public class ZLMHttpHookListener {
redisCatchStorage.stopPlayback(streamInfo);
}
}else {
- if (!"rtp".equals(app) ){
+ if (!"rtp".equals(app)){
+
+ boolean pushChange = false;
+
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (regist) {
- zlmMediaListManager.addMedia(mediaServerItem, app, streamId);
+ if ((item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8)) {
+ pushChange = true;
+ zlmMediaListManager.addMedia(item);
+ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
+ redisCatchStorage.addPushStream(mediaServerItem, app, streamId, streamInfo);
+ }
}else {
- zlmMediaListManager.removeMedia( app, streamId);
+ int result = zlmMediaListManager.removeMedia( app, streamId);
+ redisCatchStorage.removePushStream(mediaServerItem, app, streamId);
+ if (result > 0) {
+ pushChange = true;
+ }
+ }
+ if(pushChange) {
+ // 发送流变化redis消息
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetup.getServerId());
+ jsonObject.put("app", app);
+ jsonObject.put("stream", streamId);
+ jsonObject.put("register", regist);
+ jsonObject.put("mediaServerId", mediaServerId);
+ redisCatchStorage.sendStreamChangeMsg(jsonObject);
}
}
}
@@ -325,14 +366,13 @@ public class ZLMHttpHookListener {
String mediaServerId = json.getString("mediaServerId");
String streamId = json.getString("stream");
String app = json.getString("app");
-
- // TODO 如果在给上级推流,也不停止。
+ JSONObject ret = new JSONObject();
+ ret.put("code", 0);
if ("rtp".equals(app)){
- JSONObject ret = new JSONObject();
- ret.put("code", 0);
ret.put("close", true);
StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId);
if (streamInfoForPlayCatch != null) {
+ // 如果在给上级推流,也不停止。
if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
ret.put("close", false);
} else {
@@ -345,6 +385,12 @@ public class ZLMHttpHookListener {
if (streamInfoForPlayBackCatch != null) {
cmder.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(), streamInfoForPlayBackCatch.getChannelId());
redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch);
+ }else {
+ StreamInfo streamInfoForDownload = redisCatchStorage.queryDownloadByStreamId(streamId);
+ // 进行录像下载时无人观看不断流
+ if (streamInfoForDownload != null) {
+ ret.put("close", false);
+ }
}
}
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
@@ -353,9 +399,15 @@ public class ZLMHttpHookListener {
}
return new ResponseEntity(ret.toString(),HttpStatus.OK);
}else {
- JSONObject ret = new JSONObject();
- ret.put("code", 0);
- ret.put("close", false);
+ StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
+ if (streamProxyItem != null && streamProxyItem.isEnable_remove_none_reader()) {
+ ret.put("close", true);
+ streamProxyService.del(app, streamId);
+ String url = streamProxyItem.getUrl() != null?streamProxyItem.getUrl():streamProxyItem.getSrc_url();
+ logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, streamId, url);
+ }else {
+ ret.put("close", false);
+ }
return new ResponseEntity(ret.toString(),HttpStatus.OK);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
index ea1123cbd..49fe098e0 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
@@ -87,6 +88,10 @@ public class ZLMMediaListManager {
updateMedia(mediaServerItem, app, streamId);
}
+ public void addMedia(MediaItem mediaItem) {
+ storager.updateMedia(streamPushService.transform(mediaItem));
+ }
+
public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) {
//使用异步更新推流
@@ -113,14 +118,16 @@ public class ZLMMediaListManager {
}
- public void removeMedia(String app, String streamId) {
+ public int removeMedia(String app, String streamId) {
// 查找是否关联了国标, 关联了不删除, 置为离线
StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId);
+ int result = 0;
if (streamProxyItem == null) {
- storager.removeMedia(app, streamId);
+ result = storager.removeMedia(app, streamId);
}else {
- storager.mediaOutline(app, streamId);
+ result =storager.mediaOutline(app, streamId);
}
+ return result;
}
// public void clearAllSessions() {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
index 2c95216ac..e4bcd31ac 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -29,7 +29,6 @@ public class ZLMRESTfulUtils {
OkHttpClient client = new OkHttpClient();
String url = String.format("http://%s:%s/index/api/%s", mediaServerItem.getIp(), mediaServerItem.getHttpPort(), api);
JSONObject responseJSON = null;
- logger.debug(url);
FormBody.Builder builder = new FormBody.Builder();
builder.add("secret",mediaServerItem.getSecret());
@@ -51,8 +50,9 @@ public class ZLMRESTfulUtils {
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
- String responseStr = response.body().string();
- if (responseStr != null) {
+ ResponseBody responseBody = response.body();
+ if (responseBody != null) {
+ String responseStr = responseBody.string();
responseJSON = JSON.parseObject(responseStr);
}
}else {
@@ -100,7 +100,11 @@ public class ZLMRESTfulUtils {
public void sendGetForImg(MediaServerItem mediaServerItem, String api, Map params, String targetPath, String fileName) {
String url = String.format("http://%s:%s/index/api/%s", mediaServerItem.getIp(), mediaServerItem.getHttpPort(), api);
logger.debug(url);
- HttpUrl.Builder httpBuilder = HttpUrl.parse(url).newBuilder();
+ HttpUrl parseUrl = HttpUrl.parse(url);
+ if (parseUrl == null) {
+ return;
+ }
+ HttpUrl.Builder httpBuilder = parseUrl.newBuilder();
httpBuilder.addQueryParameter("secret", mediaServerItem.getSecret());
if (params != null) {
@@ -123,16 +127,20 @@ public class ZLMRESTfulUtils {
if (targetPath != null) {
File snapFolder = new File(targetPath);
if (!snapFolder.exists()) {
- snapFolder.mkdirs();
+ if (!snapFolder.mkdirs()) {
+ logger.warn("{}路径创建失败", snapFolder.getAbsolutePath());
+ }
+
}
File snapFile = new File(targetPath + "/" + fileName);
FileOutputStream outStream = new FileOutputStream(snapFile);
- outStream.write(response.body().bytes());
+
+ outStream.write(Objects.requireNonNull(response.body()).bytes());
outStream.close();
} else {
logger.error(String.format("[ %s ]请求失败: %s %s", url, response.code(), response.message()));
}
- response.body().close();
+ Objects.requireNonNull(response.body()).close();
} else {
logger.error(String.format("[ %s ]请求失败: %s %s", url, response.code(), response.message()));
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java
index 6d9ceee91..4685d1fd6 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java
@@ -4,6 +4,11 @@ import java.util.List;
public class MediaItem {
+ /**
+ * 注册/注销
+ */
+ private boolean regist;
+
/**
* 应用名
*/
@@ -53,6 +58,11 @@ public class MediaItem {
*/
private String originUrl;
+ /**
+ * 服务器id
+ */
+ private String mediaServerId;
+
/**
* GMT unix系统时间戳,单位秒
*/
@@ -78,6 +88,14 @@ public class MediaItem {
*/
private String vhost;
+ public boolean isRegist() {
+ return regist;
+ }
+
+ public void setRegist(boolean regist) {
+ this.regist = regist;
+ }
+
/**
* 是否是docker部署, docker部署不会自动更新zlm使用的端口,需要自己手动修改
*/
@@ -376,4 +394,12 @@ public class MediaItem {
public void setDocker(boolean docker) {
this.docker = docker;
}
+
+ public String getMediaServerId() {
+ return mediaServerId;
+ }
+
+ public void setMediaServerId(String mediaServerId) {
+ this.mediaServerId = mediaServerId;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java
index 40ba215fb..38e44a98c 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java
@@ -17,6 +17,7 @@ public class StreamProxyItem extends GbStream {
private boolean enable;
private boolean enable_hls;
private boolean enable_mp4;
+ private boolean enable_remove_none_reader; // 无人观看时删除
private String platformGbId;
private String createTime;
@@ -142,4 +143,12 @@ public class StreamProxyItem extends GbStream {
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
+
+ public boolean isEnable_remove_none_reader() {
+ return enable_remove_none_reader;
+ }
+
+ public void setEnable_remove_none_reader(boolean enable_remove_none_reader) {
+ this.enable_remove_none_reader = enable_remove_none_reader;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/onvif/IONVIFServer.java b/src/main/java/com/genersoft/iot/vmp/onvif/IONVIFServer.java
deleted file mode 100644
index eb81a3635..000000000
--- a/src/main/java/com/genersoft/iot/vmp/onvif/IONVIFServer.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.genersoft.iot.vmp.onvif;
-
-import be.teletask.onvif.models.OnvifDevice;
-import com.genersoft.iot.vmp.onvif.dto.ONVIFCallBack;
-
-import java.util.List;
-
-public interface IONVIFServer {
-
- void search(int timeout, ONVIFCallBack> callBack);
-
- void getRTSPUrl(int timeout, OnvifDevice device, ONVIFCallBack callBack);
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/onvif/dto/ONVIFCallBack.java b/src/main/java/com/genersoft/iot/vmp/onvif/dto/ONVIFCallBack.java
deleted file mode 100644
index 3fdbde5b9..000000000
--- a/src/main/java/com/genersoft/iot/vmp/onvif/dto/ONVIFCallBack.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.genersoft.iot.vmp.onvif.dto;
-
-public interface ONVIFCallBack {
- void run(int errorCode, T t);
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/onvif/impl/ONVIFServerIMpl.java b/src/main/java/com/genersoft/iot/vmp/onvif/impl/ONVIFServerIMpl.java
deleted file mode 100644
index d952cc8fb..000000000
--- a/src/main/java/com/genersoft/iot/vmp/onvif/impl/ONVIFServerIMpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.genersoft.iot.vmp.onvif.impl;
-
-
-import be.teletask.onvif.DiscoveryManager;
-import be.teletask.onvif.OnvifManager;
-import be.teletask.onvif.listeners.*;
-import be.teletask.onvif.models.*;
-import be.teletask.onvif.responses.OnvifResponse;
-import com.genersoft.iot.vmp.onvif.IONVIFServer;
-import com.genersoft.iot.vmp.onvif.dto.ONVIFCallBack;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("rawtypes")
-/**
- * 处理onvif的各种操作
- */
-@Service
-public class ONVIFServerIMpl implements IONVIFServer {
-
- private final static Logger logger = LoggerFactory.getLogger(ONVIFServerIMpl.class);
-
- @Override
- public void search(int timeout, ONVIFCallBack> callBack) {
- DiscoveryManager manager = new DiscoveryManager();
- manager.setDiscoveryTimeout(timeout);
- Map deviceMap = new HashMap<>();
- // 搜索设备
- manager.discover(new DiscoveryListener() {
- @Override
- public void onDiscoveryStarted() {
- logger.info("Discovery started");
- }
-
- @Override
- public void onDevicesFound(List devices) {
- if (devices == null || devices.size() == 0) return;
- for (Device device : devices){
- logger.info(device.getHostName());
- deviceMap.put(device.getHostName(), device);
- }
- }
-
- // 搜索结束
- @Override
- public void onDiscoveryFinished() {
- ArrayList result = new ArrayList<>();
- for (Device device : deviceMap.values()) {
- logger.info(device.getHostName());
- result.add(device.getHostName());
- }
- callBack.run(0, result);
- }
- });
- }
-
- @Override
- public void getRTSPUrl(int timeout, OnvifDevice device, ONVIFCallBack callBack) {
- if (device.getHostName() == null ){
- callBack.run(400, null);
- }
- OnvifManager onvifManager = new OnvifManager();
- onvifManager.setOnvifResponseListener(new OnvifResponseListener(){
-
- @Override
- public void onResponse(OnvifDevice onvifDevice, OnvifResponse response) {
- logger.info("[RESPONSE] " + onvifDevice.getHostName()
- + "======" + response.getErrorCode()
- + "======" + response.getErrorMessage());
- }
-
- @Override
- public void onError(OnvifDevice onvifDevice, int errorCode, String errorMessage) {
- logger.info("[ERROR] " + onvifDevice.getHostName() + "======" + errorCode + "=======" + errorMessage);
- callBack.run(errorCode, errorMessage);
- }
- });
-
- try {
- onvifManager.getServices(device, (OnvifDevice onvifDevice, OnvifServices services) -> {
- if (services.getProfilesPath().equals("/onvif/Media")) {
- onvifDevice.setPath(services);
- onvifManager.getMediaProfiles(onvifDevice, new OnvifMediaProfilesListener() {
- @Override
- public void onMediaProfilesReceived(OnvifDevice device, List mediaProfiles) {
- for (OnvifMediaProfile mediaProfile : mediaProfiles) {
- logger.info(mediaProfile.getName());
- logger.info(mediaProfile.getToken());
- if (mediaProfile.getName().equals("mainStream")) {
- onvifManager.getMediaStreamURI(device, mediaProfile, (OnvifDevice onvifDevice,
- OnvifMediaProfile profile, String uri) -> {
-
- uri = uri.replace("rtsp://", "rtsp://"+ device.getUsername() + ":"+ device.getPassword() + "@");
- logger.info(onvifDevice.getHostName() + "的地址" + uri);
- callBack.run(0, uri);
- });
- }
- }
- }
- });
- }
- });
- }catch (Exception e) {
- callBack.run(400, e.getMessage());
- }
-
-
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
index f42b86791..657d28085 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -61,4 +61,6 @@ public interface IMediaServerService {
boolean checkMediaRecordServer(String ip, int port);
void delete(String id);
+
+ MediaServerItem getDefaultMediaServer();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
index 54f8315e5..8c05b85f2 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
@@ -32,7 +32,7 @@ public interface IMediaService {
* @param stream
* @return
*/
- StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, JSONArray tracks);
+ StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks);
/**
* 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况
@@ -40,5 +40,5 @@ public interface IMediaService {
* @param stream
* @return
*/
- StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr);
+ StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
index 9e5c44422..8a7437cdd 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -18,4 +18,6 @@ public interface IPlayService {
PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
MediaServerItem getNewMediaServerItem(Device device);
+
+ void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
index 12e489832..60f3303b1 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
@@ -1,8 +1,10 @@
package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
public interface IStreamProxyService {
@@ -11,7 +13,7 @@ public interface IStreamProxyService {
* 保存视频代理
* @param param
*/
- String save(StreamProxyItem param);
+ WVPResult save(StreamProxyItem param);
/**
* 添加视频代理到zlm
@@ -63,4 +65,10 @@ public interface IStreamProxyService {
* @return
*/
JSONObject getFFmpegCMDs(MediaServerItem mediaServerItem);
+
+ /**
+ * 根据app与stream获取streamProxy
+ * @return
+ */
+ StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
index 899da98ba..94e7d691f 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.github.pagehelper.PageInfo;
@@ -32,4 +33,6 @@ public interface IStreamPushService {
* @return
*/
PageInfo getPushList(Integer page, Integer count);
+
+ StreamPushItem transform(MediaItem item);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
index 595f38c88..f9143135b 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -10,6 +10,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+/**
+ * 设备业务(目录订阅)
+ */
@Service
public class DeviceServiceImpl implements IDeviceService {
@@ -31,8 +34,11 @@ public class DeviceServiceImpl implements IDeviceService {
CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
catalogSubscribeTask.run();
// 提前开始刷新订阅
- String cron = getCron(device.getSubscribeCycleForCatalog() - 60);
- dynamicTask.startCron(device.getDeviceId(), catalogSubscribeTask, cron);
+ // TODO 使用jain sip的当时刷新订阅
+ int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog();
+ // 设置最小值为30
+ subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30);
+ dynamicTask.startCron(device.getDeviceId(), catalogSubscribeTask, subscribeCycleForCatalog - 5);
return true;
}
@@ -41,21 +47,10 @@ public class DeviceServiceImpl implements IDeviceService {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
+ logger.info("移除目录订阅: {}", device.getDeviceId());
dynamicTask.stopCron(device.getDeviceId());
+ device.setSubscribeCycleForCatalog(0);
+ sipCommander.catalogSubscribe(device, null, null);
return true;
}
-
- public String getCron(int time) {
- if (time <= 59) {
- return "0/" + time +" * * * * ?";
- }else if (time <= 60* 59) {
- int minute = time/(60);
- return "0 0/" + minute +" * * * ?";
- }else if (time <= 60* 60* 59) {
- int hour = time/(60*60);
- return "0 0 0/" + hour +" * * ?";
- }else {
- return "0 0/10 * * * ?";
- }
- }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index 4ba361c03..51f99d92e 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -47,7 +47,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
private boolean sslEnabled;
@Value("${server.port}")
- private String serverPort;
+ private Integer serverPort;
@Autowired
private MediaConfig mediaConfig;
@@ -241,6 +241,11 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
return mediaServerMapper.queryOneByHostAndPort(host, port);
}
+ @Override
+ public MediaServerItem getDefaultMediaServer() {
+ return mediaServerMapper.queryDefault();
+ }
+
@Override
public void clearMediaServerForOnline() {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX;
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
index 51c5979e8..9e5221bce 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -31,14 +32,20 @@ public class MediaServiceImpl implements IMediaService {
@Override
- public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks) {
+ public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks) {
return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
}
@Override
public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr) {
StreamInfo streamInfo = null;
- MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
+
+ MediaServerItem mediaInfo;
+ if (mediaServerId == null) {
+ mediaInfo = mediaServerService.getDefaultMediaServer();
+ }else {
+ mediaInfo = mediaServerService.getOne(mediaServerId);
+ }
if (mediaInfo == null) {
return streamInfo;
}
@@ -55,13 +62,15 @@ public class MediaServiceImpl implements IMediaService {
return streamInfo;
}
+
+
@Override
public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId) {
return getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, null);
}
@Override
- public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr) {
+ public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr) {
StreamInfo streamInfoResult = new StreamInfo();
streamInfoResult.setStreamId(stream);
streamInfoResult.setApp(app);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index d7b8ffe82..640e99aac 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -34,12 +34,8 @@ import org.springframework.stereotype.Service;
import org.springframework.util.ResourceUtils;
import org.springframework.web.context.request.async.DeferredResult;
-import javax.sip.DialogTerminatedEvent;
-import javax.sip.ResponseEvent;
-import javax.sip.TimeoutEvent;
-import javax.sip.TransactionTerminatedEvent;
-import javax.sip.message.Response;
import java.io.FileNotFoundException;
+import java.util.Objects;
import java.util.UUID;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@@ -85,7 +81,13 @@ public class PlayServiceImpl implements IPlayService {
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
msg.setKey(key);
- msg.setId(playResult.getUuid());
+ String uuid = UUID.randomUUID().toString();
+ msg.setId(uuid);
+ playResult.setUuid(uuid);
+ DeferredResult> result = new DeferredResult<>(userSetup.getPlayTimeout());
+ playResult.setResult(result);
+ // 录像查询以channelId作为deviceId查询
+ resultHolder.put(key, uuid, result);
if (mediaServerItem == null) {
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
@@ -94,16 +96,9 @@ public class PlayServiceImpl implements IPlayService {
resultHolder.invokeResult(msg);
return playResult;
}
-
Device device = storager.queryVideoDevice(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
playResult.setDevice(device);
- String uuid = UUID.randomUUID().toString();
- playResult.setUuid(uuid);
- DeferredResult> result = new DeferredResult>(userSetup.getPlayTimeout());
- playResult.setResult(result);
- // 录像查询以channelId作为deviceId查询
- resultHolder.put(key, uuid, result);
// 超时处理
result.onTimeout(()->{
logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
@@ -134,18 +129,18 @@ public class PlayServiceImpl implements IPlayService {
classPath = classPath.substring(0, classPath.lastIndexOf("/") + 1);
}
if (classPath.startsWith("file:")) {
- classPath = classPath.substring(classPath.indexOf(":") + 1, classPath.length());
+ classPath = classPath.substring(classPath.indexOf(":") + 1);
}
String path = classPath + "static/static/snap/";
// 兼容Windows系统路径(去除前面的“/”)
if(System.getProperty("os.name").contains("indows")) {
- path = path.substring(1, path.length());
+ path = path.substring(1);
}
String fileName = deviceId + "_" + channelId + ".jpg";
ResponseEntity responseEntity = (ResponseEntity)result.getResult();
if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
WVPResult wvpResult = (WVPResult)responseEntity.getBody();
- if (wvpResult.getCode() == 0) {
+ if (Objects.requireNonNull(wvpResult).getCode() == 0) {
StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
String streamUrl = streamInfoForSuccess.getFmp4();
@@ -169,7 +164,7 @@ public class PlayServiceImpl implements IPlayService {
// 发送点播消息
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
- onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid.toString());
+ onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid);
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
}
@@ -192,7 +187,7 @@ public class PlayServiceImpl implements IPlayService {
if (streamId == null) {
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
- wvpResult.setMsg(String.format("点播失败, redis缓存streamId等于null"));
+ wvpResult.setMsg("点播失败, redis缓存streamId等于null");
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
return playResult;
@@ -226,7 +221,7 @@ public class PlayServiceImpl implements IPlayService {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
- onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid.toString());
+ onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid);
}, (event) -> {
mediaServerService.closeRTPServer(playResult.getDevice(), channelId);
WVPResult wvpResult = new WVPResult();
@@ -274,7 +269,7 @@ public class PlayServiceImpl implements IPlayService {
public MediaServerItem getNewMediaServerItem(Device device) {
if (device == null) return null;
String mediaServerId = device.getMediaServerId();
- MediaServerItem mediaServerItem = null;
+ MediaServerItem mediaServerItem;
if (mediaServerId == null) {
mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
}else {
@@ -286,16 +281,35 @@ public class PlayServiceImpl implements IPlayService {
return mediaServerItem;
}
+
@Override
public void onPublishHandlerForPlayBack(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage();
- msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
+ msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId);
msg.setId(uuid);
StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid);
if (streamInfo != null) {
redisCatchStorage.startPlayback(streamInfo);
msg.setData(JSON.toJSONString(streamInfo));
resultHolder.invokeResult(msg);
+ } else {
+ logger.warn("设备回放API调用失败!");
+ msg.setData("设备回放API调用失败!");
+ resultHolder.invokeResult(msg);
+ }
+ }
+
+
+ @Override
+ public void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
+ RequestMessage msg = new RequestMessage();
+ msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
+ msg.setId(uuid);
+ StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid);
+ if (streamInfo != null) {
+ redisCatchStorage.startDownload(streamInfo);
+ msg.setData(JSON.toJSONString(streamInfo));
+ resultHolder.invokeResult(msg);
} else {
logger.warn("设备预览API调用失败!");
msg.setData("设备预览API调用失败!");
@@ -303,6 +317,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
+
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
String streamId = resonse.getString("stream");
JSONArray tracks = resonse.getJSONArray("tracks");
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index bbcad1c39..3ffc68e39 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -1,18 +1,21 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +37,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
private IVideoManagerStorager videoManagerStorager;
@Autowired
- private IRedisCatchStorage redisCatchStorage;
+ private IMediaService mediaService;
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;;
@@ -56,8 +59,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
- public String save(StreamProxyItem param) {
+ public WVPResult save(StreamProxyItem param) {
MediaServerItem mediaInfo;
+ WVPResult wvpResult = new WVPResult<>();
+ wvpResult.setCode(0);
if ("auto".equals(param.getMediaServerId())){
mediaInfo = mediaServerService.getMediaServerForMinimumLoad();
}else {
@@ -65,7 +70,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (mediaInfo == null) {
logger.warn("保存代理未找到在线的ZLM...");
- return "保存失败";
+ wvpResult.setMsg("保存失败");
+ return wvpResult;
}
String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
param.getStream() );
@@ -83,6 +89,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
result.append(", 但是启用失败,请检查流地址是否可用");
param.setEnable(false);
videoManagerStorager.updateStreamProxy(param);
+ }else {
+ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
+ mediaInfo, param.getApp(), param.getStream(), null);
+ wvpResult.setData(streamInfo);
}
}
}
@@ -97,6 +107,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
result.append(", 但是启用失败,请检查流地址是否可用");
param.setEnable(false);
videoManagerStorager.updateStreamProxy(param);
+ }else {
+ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
+ mediaInfo, param.getApp(), param.getStream(), null);
+ wvpResult.setData(streamInfo);
}
}
}else {
@@ -113,7 +127,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]失败");
}
}
- return result.toString();
+ wvpResult.setMsg(result.toString());
+ return wvpResult;
}
@Override
@@ -213,4 +228,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return result;
}
+
+
+ @Override
+ public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
+ return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId);
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index aabf35f35..28207212e 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -51,33 +51,38 @@ public class StreamPushServiceImpl implements IStreamPushService {
for (MediaItem item : mediaItems) {
// 不保存国标推理以及拉流代理的流
- if (item.getOriginType() == 3 || item.getOriginType() == 4 || item.getOriginType() == 5) {
- continue;
- }
- String key = item.getApp() + "_" + item.getStream();
- StreamPushItem streamPushItem = result.get(key);
- if (streamPushItem == null) {
- streamPushItem = new StreamPushItem();
- streamPushItem.setApp(item.getApp());
- streamPushItem.setMediaServerId(mediaServerItem.getId());
- streamPushItem.setStream(item.getStream());
- streamPushItem.setAliveSecond(item.getAliveSecond());
- streamPushItem.setCreateStamp(item.getCreateStamp());
- streamPushItem.setOriginSock(item.getOriginSock());
- streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
- streamPushItem.setOriginType(item.getOriginType());
- streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
- streamPushItem.setOriginUrl(item.getOriginUrl());
- streamPushItem.setCreateStamp(item.getCreateStamp());
- streamPushItem.setAliveSecond(item.getAliveSecond());
- streamPushItem.setStatus(true);
- streamPushItem.setVhost(item.getVhost());
- result.put(key, streamPushItem);
+ if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) {
+ String key = item.getApp() + "_" + item.getStream();
+ StreamPushItem streamPushItem = result.get(key);
+ if (streamPushItem == null) {
+ streamPushItem = transform(item);
+ result.put(key, streamPushItem);
+ }
}
+
}
return new ArrayList<>(result.values());
}
+ @Override
+ public StreamPushItem transform(MediaItem item) {
+ StreamPushItem streamPushItem = new StreamPushItem();
+ streamPushItem.setApp(item.getApp());
+ streamPushItem.setMediaServerId(item.getMediaServerId());
+ streamPushItem.setStream(item.getStream());
+ streamPushItem.setAliveSecond(item.getAliveSecond());
+ streamPushItem.setCreateStamp(item.getCreateStamp());
+ streamPushItem.setOriginSock(item.getOriginSock());
+ streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
+ streamPushItem.setOriginType(item.getOriginType());
+ streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
+ streamPushItem.setOriginUrl(item.getOriginUrl());
+ streamPushItem.setCreateStamp(item.getCreateStamp());
+ streamPushItem.setAliveSecond(item.getAliveSecond());
+ streamPushItem.setStatus(true);
+ streamPushItem.setVhost(item.getVhost());
+ return streamPushItem;
+ }
@Override
public PageInfo getPushList(Integer page, Integer count) {
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index e4313d905..5878339bf 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import java.util.List;
import java.util.Map;
@@ -120,5 +121,35 @@ public interface IRedisCatchStorage {
/**
* 在redis添加wvp的信息
*/
- void updateWVPInfo(JSONObject jsonObject);
+ void updateWVPInfo(String id, JSONObject jsonObject, int time);
+
+ /**
+ * 发送推流生成与推流消失消息
+ * @param jsonObject 消息内容
+ */
+ void sendStreamChangeMsg(JSONObject jsonObject);
+
+ /**
+ * 添加流信息到redis
+ * @param mediaServerItem
+ * @param app
+ * @param streamId
+ */
+ void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo);
+
+ /**
+ * 移除流信息从redis
+ * @param mediaServerItem
+ * @param app
+ * @param streamId
+ */
+ void removePushStream(MediaServerItem mediaServerItem, String app, String streamId);
+
+ /**
+ * 开始下载录像时存入
+ * @param streamInfo
+ */
+ boolean startDownload(StreamInfo streamInfo);
+
+ StreamInfo queryDownloadByStreamId(String streamId);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
index 931530aef..570718b30 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -353,7 +353,7 @@ public interface IVideoManagerStorager {
* @param app
* @param stream
*/
- void removeMedia(String app, String stream);
+ int removeMedia(String app, String stream);
/**
@@ -366,7 +366,7 @@ public interface IVideoManagerStorager {
* @param app
* @param streamId
*/
- void mediaOutline(String app, String streamId);
+ int mediaOutline(String app, String streamId);
/**
* 设置平台在线/离线
@@ -406,4 +406,12 @@ public interface IVideoManagerStorager {
* @param channelId 通道ID
*/
void deviceChannelOffline(String deviceId, String channelId);
+
+ /**
+ * 通过app与stream获取StreamProxy
+ * @param app
+ * @param streamId
+ * @return
+ */
+ StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index 230afbc9f..ebf4239af 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -28,7 +28,7 @@ public interface GbStreamMapper {
"latitude=#{latitude}," +
"mediaServerId=#{mediaServerId}," +
"status=${status} " +
- "WHERE app=#{app} AND stream=#{stream} AND gbId=#{gbId}")
+ "WHERE app=#{app} AND stream=#{stream}")
int update(GbStream gbStream);
@Delete("DELETE FROM gb_stream WHERE app=#{app} AND stream=#{stream}")
@@ -53,7 +53,7 @@ public interface GbStreamMapper {
@Update("UPDATE gb_stream " +
"SET status=${status} " +
"WHERE app=#{app} AND stream=#{stream}")
- void setStatus(String app, String stream, boolean status);
+ int setStatus(String app, String stream, boolean status);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ")
List selectAllByMediaServerId(String mediaServerId);
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java
index 8d45b0543..8cd5d6a9a 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java
@@ -105,4 +105,7 @@ public interface MediaServerMapper {
@Select("SELECT * FROM media_server WHERE ip='${host}' and httpPort=${port}")
MediaServerItem queryOneByHostAndPort(String host, int port);
+
+ @Select("SELECT * FROM media_server WHERE defaultServer=1")
+ MediaServerItem queryDefault();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
index 7346da5cd..11753f7e1 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
@@ -11,9 +11,10 @@ import java.util.List;
public interface StreamProxyMapper {
@Insert("INSERT INTO stream_proxy (type, app, stream,mediaServerId, url, src_url, dst_url, " +
- "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, createTime) VALUES" +
+ "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, enable_remove_none_reader, createTime) VALUES" +
"('${type}','${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " +
- "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, '${createTime}' )")
+ "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, " +
+ "${enable_remove_none_reader}, '${createTime}' )")
int add(StreamProxyItem streamProxyDto);
@Update("UPDATE stream_proxy " +
@@ -29,6 +30,7 @@ public interface StreamProxyMapper {
"rtp_type=#{rtp_type}, " +
"enable_hls=#{enable_hls}, " +
"enable=#{enable}, " +
+ "enable_remove_none_reader=#{enable_remove_none_reader}, " +
"enable_mp4=#{enable_mp4} " +
"WHERE app=#{app} AND stream=#{stream}")
int update(StreamProxyItem streamProxyDto);
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index e5b1269df..e98d8fad3 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
@@ -63,15 +64,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
streamInfo.getChannelId()));
}
@Override
- public StreamInfo queryPlayByStreamId(String steamId) {
- List