异步处理移动位置消息

This commit is contained in:
lin 2026-01-12 15:21:22 +08:00
parent 29321beeaf
commit 8babccae0d
2 changed files with 12 additions and 7 deletions

View File

@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,10 +12,10 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.header.*;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

View File

@ -19,6 +19,7 @@ import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@ -56,7 +57,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
private IMobilePositionService mobilePositionService;
public void process(RequestEvent evt) {
logger.warn("[notify-移动位置-未解析] 收到移动位置请求,{}", evt.getRequest());
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
logger.error("[notify-移动位置] 待处理消息队列已满 {}返回486 BUSY_HERE消息不做处理", userSetting.getMaxNotifyCountQueue());
return;
@ -64,7 +65,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
taskQueue.offer(new HandlerCatchData(evt, null, null));
}
@Scheduled(fixedRate = 400) //每200毫秒执行一次
@Scheduled(fixedRate = 1000) //每200毫秒执行一次
@Async("taskExecutor")
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
@ -99,6 +101,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
continue;
}
String sn = null;
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setDeviceId(device.getDeviceId());
mobilePosition.setDeviceName(device.getName());
@ -150,11 +153,14 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
mobilePosition.setAltitude(0.0);
}
break;
case "SN":
sn = element.getStringValue();
break;
}
}
logger.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
logger.info("[收到移动位置订阅通知]{}/{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
sn, mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
mobilePosition.setReportSource("Mobile Position");
if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) ) {