重构移动位置通知处理逻辑,优化任务队列执行方式

This commit is contained in:
lin 2026-04-16 22:56:09 +08:00
parent 04789eecab
commit fb06fa85f1
4 changed files with 82 additions and 56 deletions

View File

@ -116,10 +116,18 @@ public class EventPublisher {
applicationEventPublisher.publishEvent(event); applicationEventPublisher.publishEvent(event);
} }
public void mobilePositionListEventPublish(List<MobilePosition> mobilePositionList) {
MobilePositionEvent event = new MobilePositionEvent(this);
event.setMobilePositionList(mobilePositionList);
applicationEventPublisher.publishEvent(event);
}
public void deviceOfflineEventPublish(Set<String> deviceIds) { public void deviceOfflineEventPublish(Set<String> deviceIds) {
DeviceOfflineEvent event = new DeviceOfflineEvent(this); DeviceOfflineEvent event = new DeviceOfflineEvent(this);
event.setDeviceIds(deviceIds); event.setDeviceIds(deviceIds);
applicationEventPublisher.publishEvent(event); applicationEventPublisher.publishEvent(event);
} }
} }

View File

@ -5,6 +5,8 @@ import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import java.util.List;
public class MobilePositionEvent extends ApplicationEvent { public class MobilePositionEvent extends ApplicationEvent {
public MobilePositionEvent(Object source) { public MobilePositionEvent(Object source) {
@ -14,4 +16,8 @@ public class MobilePositionEvent extends ApplicationEvent {
@Getter @Getter
@Setter @Setter
private MobilePosition mobilePosition; private MobilePosition mobilePosition;
@Getter
@Setter
private List<MobilePosition> mobilePositionList;
} }

View File

@ -34,6 +34,7 @@ import jakarta.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element; import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -99,7 +100,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
* 监听录像查询结束事件 * 监听录像查询结束事件
*/ */
@Async @Async
@org.springframework.context.event.EventListener @EventListener
public void onApplicationEvent(RecordInfoEndEvent event) { public void onApplicationEvent(RecordInfoEndEvent event) {
SynchronousQueue<RecordInfo> queue = topicSubscribers.get("record" + event.getRecordInfo().getSn()); SynchronousQueue<RecordInfo> queue = topicSubscribers.get("record" + event.getRecordInfo().getSn());
if (queue != null) { if (queue != null) {
@ -107,8 +108,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
} }
} }
@Autowired
private ISIPCommander cmder;
@Override @Override
@ -293,7 +292,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
deviceChannel.getName(), deviceChannel.getDeviceId()); deviceChannel.getName(), deviceChannel.getDeviceId());
String cmdString = getText(rootElement, type.getVal()); String cmdString = getText(rootElement, type.getVal());
try { try {
cmder.fronEndCmd(device, deviceChannel.getDeviceId(), cmdString, errorResult->{ commander.fronEndCmd(device, deviceChannel.getDeviceId(), cmdString, errorResult->{
callback.run(errorResult.statusCode, errorResult.msg, null); callback.run(errorResult.statusCode, errorResult.msg, null);
}, errorResult->{ }, errorResult->{
callback.run(errorResult.statusCode, errorResult.msg, null); callback.run(errorResult.statusCode, errorResult.msg, null);

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
@ -14,6 +15,8 @@ import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
@ -21,6 +24,7 @@ import javax.sip.RequestEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -40,79 +44,88 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
private final IDeviceChannelService deviceChannelService; private final IDeviceChannelService deviceChannelService;
private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
private final TaskExecutor taskExecutor; private final TaskExecutor taskExecutor;
private final EventPublisher eventPublisher; private final EventPublisher eventPublisher;
private final UserSetting userSetting;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this); notifyMessageHandler.addHandler(cmdType, this);
} }
@Override @Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) { public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
boolean isEmpty = taskQueue.isEmpty(); log.error("[message-notify-移动位置] 待处理消息队列已满 {}返回486 BUSY_HERE", userSetting.getMaxNotifyCountQueue());
taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); return;
}
taskQueue.offer(new HandlerCatchData(evt, null, null));
// 回复200 OK // 回复200 OK
try { try {
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage()); log.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage());
} }
if (isEmpty) {
taskExecutor.execute(() -> { }
while (!taskQueue.isEmpty()) { @Scheduled(fixedDelay = 400) //每400毫秒执行一次
SipMsgInfo sipMsgInfo = taskQueue.poll(); @Async
try { public void executeTaskQueue(){
Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); if (taskQueue.isEmpty()) {
if (rootElementAfterCharset == null) { return;
log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); }
continue;
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<MobilePosition> mobilePositionList = new ArrayList<>();
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) {
continue;
}
Device device = take.getDevice();
try {
Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset());
if (rootElementAfterCharset == null) {
log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId());
List<MobilePosition> mobilePositions = MobilePosition.decode(device.getName(), device.getDeviceId(), rootElementAfterCharset);
for (MobilePosition mobilePosition : mobilePositions) {
try {
mobilePosition.setReportSource("Mobile Position");
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTime());
mobilePositionList.add(mobilePosition);
}catch (Exception e) {
log.error("未处理的异常 ", e);
} }
List<MobilePosition> mobilePositionList = MobilePosition.decode(sipMsgInfo.getDevice().getName(), sipMsgInfo.getDevice().getDeviceId(), rootElementAfterCharset);
mobilePositionList.forEach(mobilePosition -> {
try {
// 更新device channel 的经纬度
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), mobilePosition.getChannelDeviceId());
if (deviceChannel == null) {
log.warn("[解析移动位置通知] 未找到通道:{}/{}", device.getDeviceId(), mobilePosition.getChannelDeviceId());
return;
}
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setReportSource("Mobile Position");
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTime());
mobilePositionService.add(mobilePosition);
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
try {
eventPublisher.mobilePositionEventPublish(mobilePosition);
}catch (Exception e) {
log.error("[MobilePositionEvent] 发送失败: ", e);
}
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
}catch (Exception e) {
log.error("未处理的异常 ", e);
}
});
} catch (DocumentException e) {
log.error("未处理的异常 ", e);
} catch (Exception e) {
log.warn("[移动位置通知] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[移动位置通知] 异常内容: ", e);
} }
} }
}); }catch (Exception e) {
log.warn("[移动位置通知] 发现未处理的异常, \r\n{}", take.getEvt().getRequest());
log.error("[移动位置通知] 异常内容: ", e);
}
}
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
if (!mobilePositionList.isEmpty()) {
try {
eventPublisher.mobilePositionListEventPublish(mobilePositionList);
}catch (Exception e) {
log.error("[MobilePositionEvent] 发送失败: ", e);
}
} }
} }