Merge remote-tracking branch 'origin/dev/压力测试' into dev/压力测试

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java
This commit is contained in:
lin 2026-01-19 21:47:59 +08:00
commit ec45deda5e

View File

@ -16,8 +16,6 @@ import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@ -181,131 +179,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
}
}
// @Scheduled(fixedDelay = 50)
// @Transactional
public void executeTaskQueue(){
if (taskQueue.isEmpty()) {
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) {
continue;
}
RequestEvent evt = take.getEvt();
int sn = 0;
// 全局异常捕获保证下一条可以得到处理
try {
Element rootElement = null;
try {
rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
} catch (DocumentException e) {
log.error("[xml解析] 失败: ", e);
continue;
}
if (rootElement == null) {
log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
continue;
}
Element deviceListElement = rootElement.element("DeviceList");
Element sumNumElement = rootElement.element("SumNum");
Element snElement = rootElement.element("SN");
sn = Integer.parseInt(snElement.getText());
int sumNum = Integer.parseInt(sumNumElement.getText());
if (sumNum == 0) {
log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
// 数据已经完整接收
deviceChannelService.cleanChannelsForDevice(take.getDevice().getId());
// 推送空数据不然无法及时结束
catalogDataCatch.put(take.getDevice().getDeviceId(), sn, 0, take.getDevice(),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, null);
return;
} else {
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
List<DeviceChannel> channelList = new ArrayList<>();
List<Region> regionList = new ArrayList<>();
List<Group> groupList = new ArrayList<>();
// 遍历DeviceList
while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
// 总数减一 避免最后总数不对 无法确定问题
continue;
}
// 从xml解析内容到 DeviceChannel 对象
DeviceChannel channel = DeviceChannel.decode(itemDevice);
if (channel.getDeviceId() == null) {
log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
continue;
}
channel.setDataDeviceId(take.getDevice().getId());
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
channel.setParentId(null);
}
// 解析通道类型
if (channel.getDeviceId().length() <= 8) {
// 行政区划
Region region = Region.getInstance(channel);
regionList.add(region);
channel.setChannelType(1);
}else if (channel.getDeviceId().length() == 20){
// 业务分组/虚拟组织
Group group = Group.getInstance(channel);
if (group != null) {
channel.setParental(1);
channel.setChannelType(2);
groupList.add(group);
}
if (channel.getLongitude() != null && channel.getLatitude() != null && channel.getLongitude() > 0 && channel.getLatitude() > 0) {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude());
channel.setGbLongitude(wgs84Position[0]);
channel.setGbLatitude(wgs84Position[1]);
}
}
channelList.add(channel);
}
catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(),
channelList, regionList, groupList);
log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.size(take.getDevice().getDeviceId(), sn), sumNum);
}
}
} catch (Exception e) {
log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[收到通道] 异常内容: ", e);
} finally {
String deviceId = take.getDevice().getDeviceId();
if (catalogDataCatch.size(deviceId, sn) > 0
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
// 数据已经完整接收 此时可能存在某个设备离线变上线的情况但是考虑到性能此处不做处理
// 目前支持设备通道上线通知时和设备上线时向上级通知
boolean resetChannelsResult = saveData(take.getDevice(), sn);
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, sn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId(), sn).size() + "";
catalogDataCatch.setChannelSyncEnd(deviceId, sn, errorMsg);
} else {
catalogDataCatch.setChannelSyncEnd(deviceId, sn, null);
}
}
}
}
}
@Transactional
public boolean saveData(Device device, int sn) {