Compare commits

..

1 Commits

Author SHA1 Message Date
山海紫穹
0b78ae3591
Pre Merge pull request !45 from 山海紫穹/dev260107_2 2026-06-08 04:40:18 +00:00
3 changed files with 57 additions and 47 deletions

View File

@ -36,8 +36,6 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public class PlatformChannelServiceImpl implements IPlatformChannelService {
private static final int BATCH_SIZE = 500;
private final PlatformChannelMapper platformChannelMapper;
private final EventPublisher eventPublisher;
@ -452,7 +450,34 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
}
List<CommonGBChannel> channelListShare = platformChannelMapper.queryShare(platformId, null);
Assert.notEmpty(channelListShare, "未共享任何通道");
return removeChannelsFromDb(platform, platformId, channelListShare);
int result = platformChannelMapper.removeChannelsWithPlatform(platformId, channelListShare);
if (result > 0) {
// 查询通道相关的分组信息
Set<Region> regionSet = regionMapper.queryByChannelList(channelListShare);
Set<Region> deleteRegion = deleteEmptyRegion(regionSet, platformId);
if (!deleteRegion.isEmpty()) {
for (Region region : deleteRegion) {
channelListShare.add(0, CommonGBChannel.build(region));
}
}
// 查询通道相关的分组信息
Set<Group> groupSet = groupMapper.queryByChannelList(channelListShare);
Set<Group> deleteGroup = deleteEmptyGroup(groupSet, platformId);
if (!deleteGroup.isEmpty()) {
for (Group group : deleteGroup) {
channelListShare.add(0, CommonGBChannel.build(group));
}
}
// 发送消息
try {
// 发送catalog
eventPublisher.catalogEventPublish(platform, channelListShare, CatalogEvent.DEL);
} catch (Exception e) {
log.warn("[移除全部关联通道] 发送失败,数量:{}", channelListShare.size(), e);
}
}
return result;
}
@Override
@ -469,44 +494,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
removeChannels(platformId, channelList);
}
private <T> List<List<T>> partition(List<T> list, int size) {
List<List<T>> result = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
result.add(list.subList(i, Math.min(i + size, list.size())));
}
return result;
}
private int removeChannelsFromDb(Platform platform, Integer platformId, List<CommonGBChannel> channelList) {
List<List<CommonGBChannel>> batches = partition(channelList, BATCH_SIZE);
int totalResult = 0;
for (List<CommonGBChannel> batch : batches) {
totalResult += platformChannelMapper.removeChannelsWithPlatform(platformId, batch);
}
if (totalResult > 0) {
Set<Region> regionSet = regionMapper.queryByChannelList(channelList);
Set<Region> deleteRegion = deleteEmptyRegion(regionSet, platformId);
if (!deleteRegion.isEmpty()) {
for (Region region : deleteRegion) {
channelList.add(0, CommonGBChannel.build(region));
}
}
Set<Group> groupSet = groupMapper.queryByChannelList(channelList);
Set<Group> deleteGroup = deleteEmptyGroup(groupSet, platformId);
if (!deleteGroup.isEmpty()) {
for (Group group : deleteGroup) {
channelList.add(0, CommonGBChannel.build(group));
}
}
try {
eventPublisher.catalogEventPublish(platform, channelList, CatalogEvent.DEL);
} catch (Exception e) {
log.warn("[取消共享通道] 发送失败,数量:{}", channelList.size(), e);
}
}
return totalResult;
}
@Transactional
public int removeChannelList(Integer platformId, List<CommonGBChannel> channelList) {
Platform platform = platformMapper.query(platformId);
@ -526,12 +513,36 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
}
return result;
}
int result = removeChannelsFromDb(platform, platformId, channelList);
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining(","));
int result = platformChannelMapper.removeChannelsWithPlatform(platformId, channelList);
if (result <= 0) {
String deviceIds = channelList.stream().map(CommonGBChannel::getGbDeviceId).collect(Collectors.joining(","));
log.info("[取消共享通道] 平台{}未关联通道: {}", platformId, deviceIds);
return 0;
}
// 查询通道相关的分组信息
Set<Region> regionSet = regionMapper.queryByChannelList(channelList);
Set<Region> deleteRegion = deleteEmptyRegion(regionSet, platformId);
if (!deleteRegion.isEmpty()) {
for (Region region : deleteRegion) {
channelList.add(0, CommonGBChannel.build(region));
}
}
// 查询通道相关的分组信息
Set<Group> groupSet = groupMapper.queryByChannelList(channelList);
Set<Group> deleteGroup = deleteEmptyGroup(groupSet, platformId);
if (!deleteGroup.isEmpty()) {
for (Group group : deleteGroup) {
channelList.add(0, CommonGBChannel.build(group));
}
}
// 发送消息
try {
// 发送catalog
eventPublisher.catalogEventPublish(platform, channelList, CatalogEvent.DEL);
} catch (Exception e) {
log.warn("[取消共享通道] 发送失败,数量:{}", channelList.size(), e);
}
return result;
}

View File

@ -861,9 +861,7 @@ public class PlayServiceImpl implements IPlayService {
log.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
String oldStreamId = String.format("%08x", Long.parseLong(ssrcInfo.getSsrc())).toUpperCase();
String newStreamId = String.format("%08x", Long.parseLong(ssrcInResponse)).toUpperCase();
if (!mediaServerItem.isRtpEnable()) { // 多端口时按照端口绑定了stream即使stream与ssrc不一致也不会影响
receiveRtpServerService.refreshAuthenticateInfo(oldStreamId, newStreamId);
}
receiveRtpServerService.refreshAuthenticateInfo(oldStreamId, newStreamId);
// ssrc 不一致
if (mediaServerItem.isRtpEnable()) {
// 多端口

View File

@ -177,7 +177,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
String sendSsrc = sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) ? "0" : "1");
finalInviteInfo.setSsrc(sendSsrc);
log.info("[上级INVITE] 使用自定义SSRC: {}", sendSsrc);
}
// 构建sendRTP内容
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(),
@ -225,6 +224,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 上级INVITE 发送 200SDP: {}", e.getMessage());
}
}
}));
}