支持批量处理并优化异步消息发送

This commit is contained in:
lin 2026-04-19 10:14:09 +08:00
parent 9909aa9655
commit 36ea9e79a8
4 changed files with 37 additions and 26 deletions

View File

@ -115,7 +115,7 @@ public class EventPublisher {
public void mobilePositionEventPublish(MobilePosition mobilePosition) { public void mobilePositionEventPublish(MobilePosition mobilePosition) {
MobilePositionEvent event = new MobilePositionEvent(this); MobilePositionEvent event = new MobilePositionEvent(this);
event.setMobilePosition(mobilePosition); event.setMobilePositionList(List.of(mobilePosition));
applicationEventPublisher.publishEvent(event); applicationEventPublisher.publishEvent(event);
} }

View File

@ -13,10 +13,6 @@ public class MobilePositionEvent extends ApplicationEvent {
super(source); super(source);
} }
@Getter
@Setter
private MobilePosition mobilePosition;
@Getter @Getter
@Setter @Setter
private List<? extends MobilePosition> mobilePositionList; private List<? extends MobilePosition> mobilePositionList;

View File

@ -71,6 +71,10 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) { if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) {
return; return;
} }
if (event.getMobilePositionList().get(0).getChannelId() != null) {
mobilePositionQueue.addAll(event.getMobilePositionList());
return;
}
for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) { for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) {
try { try {
// 此时已经完成了通道ID的添加以及坐标系的转换后续只需要将数据保存到数据库即可 // 此时已经完成了通道ID的添加以及坐标系的转换后续只需要将数据保存到数据库即可

View File

@ -30,9 +30,11 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.*; import java.util.*;
import java.util.concurrent.Executors;
@Slf4j @Slf4j
@Service @Service
@ -322,32 +324,41 @@ public class CameraChannelService implements CommandLineRunner {
} }
// 监听GPS消息如果是移动设备则发送redis消息 // 监听GPS消息如果是移动设备则发送redis消息
@Async
@EventListener @EventListener
public void onApplicationEvent(MobilePositionEvent event) { public void onApplicationEvent(MobilePositionEvent event) {
MobilePosition mobilePosition = event.getMobilePosition(); List<? extends MobilePosition> mobilePositionList = event.getMobilePositionList();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (MobilePosition mobilePosition : mobilePositionList) {
executor.submit(() -> {
// 从redis补充信息
SYMember member = getMember(mobilePosition.getChannelDeviceId());
if (member == null) {
log.info("[SY-redis发送通知-移动设备位置信息] 缓存未获取 {}", mobilePosition.toString());
return;
}
// 从redis补充信息 // 发送redis消息
SYMember member = getMember(mobilePosition.getChannelDeviceId()); JSONObject jsonObject = new JSONObject();
if (member == null) { jsonObject.put("gpsDate", DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp()));
log.info("[SY-redis发送通知-移动设备位置信息] 缓存未获取 {}", mobilePosition.toString()); jsonObject.put("unicodeNo", member.getUnicodeNo());
return; jsonObject.put("memberNo", member.getNo());
jsonObject.put("unitNo", member.getUnitNo());
jsonObject.put("longitude", mobilePosition.getLongitude());
jsonObject.put("latitude", mobilePosition.getLatitude());
jsonObject.put("altitude", mobilePosition.getAltitude());
jsonObject.put("direction", mobilePosition.getDirection());
jsonObject.put("speed", mobilePosition.getSpeed());
jsonObject.put("blockId", member.getBlockId());
jsonObject.put("gbDeviceId", mobilePosition.getChannelDeviceId());
log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString());
redisTemplateForString.convertAndSend(REDIS_GPS_MESSAGE, jsonObject.toString());
});
}
} }
// 发送redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("gpsDate", DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp()));
jsonObject.put("unicodeNo", member.getUnicodeNo());
jsonObject.put("memberNo", member.getNo());
jsonObject.put("unitNo", member.getUnitNo());
jsonObject.put("longitude", mobilePosition.getLongitude());
jsonObject.put("latitude", mobilePosition.getLatitude());
jsonObject.put("altitude", mobilePosition.getAltitude());
jsonObject.put("direction", mobilePosition.getDirection());
jsonObject.put("speed", mobilePosition.getSpeed());
jsonObject.put("blockId", member.getBlockId());
jsonObject.put("gbDeviceId", mobilePosition.getChannelDeviceId());
log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString());
redisTemplateForString.convertAndSend(REDIS_GPS_MESSAGE, jsonObject.toString());
} }
public SYMember getMember(String deviceId) { public SYMember getMember(String deviceId) {