Compare commits

..

1 Commits

Author SHA1 Message Date
阿斌
1ca4955a69
Pre Merge pull request !36 from 阿斌/N/A 2026-05-22 06:15:03 +00:00
46 changed files with 477 additions and 1257 deletions

View File

@ -128,11 +128,6 @@ public class UserSetting {
*/
private Boolean useCustomSsrcForParentInvite = Boolean.TRUE;
/**
* 多端口模式使用随机SSRC端口区分流SSRC允许重复
*/
private Boolean ssrcRandom = Boolean.FALSE;
/**
* 开启接口文档页面 默认开启生产环境建议关闭遇到swagger相关的漏洞时也可以关闭
*/

View File

@ -10,6 +10,8 @@ public class InviteMessageInfo {
private String sourceChannelId;
private String sessionName;
private String ssrc;
private String allocatedSsrc;
private String allocatedSsrcMediaServerId;
private boolean tcp;
private boolean tcpActive;
private String callId;

View File

@ -24,6 +24,11 @@ public class SendRtpInfo {
*/
private String ssrc;
/**
* 从SSRC池中分配的SSRC
*/
private String allocatedSsrc;
/**
* 目标平台或设备的编号
*/
@ -248,5 +253,7 @@ public class SendRtpInfo {
}
}
public String getSsrcToRelease() {
return allocatedSsrc;
}
}

View File

@ -47,6 +47,11 @@ public class SsrcTransaction {
*/
private String ssrc;
/**
* 从SSRC池中分配的SSRC
*/
private String allocatedSsrc;
/**
* 事务信息
*/
@ -89,5 +94,7 @@ public class SsrcTransaction {
public SsrcTransaction() {
}
public String getSsrcToRelease() {
return allocatedSsrc;
}
}

View File

@ -3,9 +3,6 @@ package com.genersoft.iot.vmp.gb28181.dao;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.controller.bean.Extent;
import com.genersoft.iot.vmp.gb28181.dao.provider.ChannelProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.MysqlCommonChannelUpsertProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.PostgresStyleCommonChannelUpsertProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.H2CommonChannelUpsertProvider;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.web.custom.bean.CameraChannel;
@ -176,12 +173,6 @@ public interface CommonGBChannelMapper {
List<CommonGBChannel> queryInListByStatus(List<CommonGBChannel> commonGBChannelList, @Param("status") String status);
@InsertProvider(type = MysqlCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "mysql")
@InsertProvider(type = PostgresStyleCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "postgresql")
@InsertProvider(type = PostgresStyleCommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "kingbase")
@InsertProvider(type = H2CommonChannelUpsertProvider.class, method = "batchUpsert", databaseId = "h2")
int batchUpsert(@Param("channels") List<CommonGBChannel> channels);
@Insert(" <script>" +
"INSERT INTO wvp_device_channel (" +
"gb_device_id," +

View File

@ -5,9 +5,6 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import com.genersoft.iot.vmp.gb28181.dao.provider.DeviceChannelProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.MysqlChannelUpsertProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.PostgresStyleUpsertProvider;
import com.genersoft.iot.vmp.gb28181.dao.provider.H2ChannelUpsertProvider;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
@ -199,12 +196,6 @@ public interface DeviceChannelMapper {
@Update(value = {"UPDATE wvp_device_channel SET status='OFF' WHERE id=#{id}"})
void offline(@Param("id") int id);
@InsertProvider(type = MysqlChannelUpsertProvider.class, method = "batchUpsert", databaseId = "mysql")
@InsertProvider(type = PostgresStyleUpsertProvider.class, method = "batchUpsert", databaseId = "postgresql")
@InsertProvider(type = PostgresStyleUpsertProvider.class, method = "batchUpsert", databaseId = "kingbase")
@InsertProvider(type = H2ChannelUpsertProvider.class, method = "batchUpsert", databaseId = "h2")
int batchUpsert(@Param("channels") List<DeviceChannel> channels);
@Insert("<script> " +
"insert into wvp_device_channel " +
"(device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +

View File

@ -1,25 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class H2ChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"MERGE INTO wvp_device_channel (" +
"data_device_id, device_id, data_type, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type" +
") KEY (data_device_id, device_id) VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.dataDeviceId}, #{item.deviceId}, #{item.dataType}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " +
"#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," +
" #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType})" +
"</foreach>" +
"</script>";
}
}

View File

@ -1,29 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class H2CommonChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"MERGE INTO wvp_device_channel (" +
"gb_device_id, data_type, data_device_id, create_time, update_time, " +
"gb_name, gb_manufacturer, gb_model, gb_owner, gb_civil_code, gb_block, gb_address, " +
"gb_parental, gb_parent_id, gb_safety_way, gb_register_way, gb_cert_num, gb_certifiable, " +
"gb_err_code, gb_end_time, gb_secrecy, gb_ip_address, gb_port, gb_password, gb_status, " +
"gb_longitude, gb_latitude, gb_ptz_type, gb_position_type, gb_room_type, gb_use_type, " +
"gb_supply_light_type, gb_direction_type, gb_resolution, gb_business_group_id, " +
"gb_download_speed, gb_svc_space_support_mod, gb_svc_time_support_mode, enable_broadcast" +
") KEY (gb_device_id) VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.gbDeviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.createTime}, #{item.updateTime}," +
"#{item.gbName}, #{item.gbManufacturer}, #{item.gbModel}, #{item.gbOwner}, #{item.gbCivilCode}, #{item.gbBlock}, #{item.gbAddress}," +
"#{item.gbParental}, #{item.gbParentId}, #{item.gbSafetyWay}, #{item.gbRegisterWay}, #{item.gbCertNum}, #{item.gbCertifiable}," +
"#{item.gbErrCode}, #{item.gbEndTime}, #{item.gbSecrecy}, #{item.gbIpAddress}, #{item.gbPort}, #{item.gbPassword}, #{item.gbStatus}," +
"#{item.gbLongitude}, #{item.gbLatitude}, #{item.gbPtzType}, #{item.gbPositionType}, #{item.gbRoomType}, #{item.gbUseType}," +
"#{item.gbSupplyLightType}, #{item.gbDirectionType}, #{item.gbResolution}, #{item.gbBusinessGroupId}," +
"#{item.gbDownloadSpeed}, #{item.gbSvcSpaceSupportMod}, #{item.gbSvcTimeSupportMode}, #{item.enableBroadcast})" +
"</foreach>" +
"</script>";
}
}

View File

@ -1,43 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class MysqlChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"INSERT INTO wvp_device_channel (" +
"device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type" +
") VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.deviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " +
"#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," +
" #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType})" +
"</foreach>" +
"ON DUPLICATE KEY UPDATE " +
"name=VALUES(name), manufacturer=VALUES(manufacturer), model=VALUES(model), owner=VALUES(owner), " +
"civil_code=VALUES(civil_code), block=VALUES(block), address=VALUES(address), " +
"parental=VALUES(parental), parent_id=VALUES(parent_id), safety_way=VALUES(safety_way), " +
"register_way=VALUES(register_way), cert_num=VALUES(cert_num), certifiable=VALUES(certifiable), " +
"err_code=VALUES(err_code), end_time=VALUES(end_time), secrecy=VALUES(secrecy), " +
"ip_address=VALUES(ip_address), port=VALUES(port), password=VALUES(password), " +
"status=VALUES(status), longitude=VALUES(longitude), latitude=VALUES(latitude), " +
"ptz_type=VALUES(ptz_type), position_type=VALUES(position_type), room_type=VALUES(room_type), " +
"use_type=VALUES(use_type), supply_light_type=VALUES(supply_light_type), " +
"direction_type=VALUES(direction_type), resolution=VALUES(resolution), " +
"business_group_id=VALUES(business_group_id), download_speed=VALUES(download_speed), " +
"svc_space_support_mod=VALUES(svc_space_support_mod), " +
"svc_time_support_mode=VALUES(svc_time_support_mode), " +
"update_time=VALUES(update_time), sub_count=VALUES(sub_count), " +
"stream_id=VALUES(stream_id), has_audio=VALUES(has_audio), " +
"gps_time=VALUES(gps_time), stream_identification=VALUES(stream_identification), " +
"channel_type=VALUES(channel_type)" +
"</script>";
}
}

View File

@ -1,46 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class MysqlCommonChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"INSERT INTO wvp_device_channel (" +
"gb_device_id, data_type, data_device_id, create_time, update_time, " +
"gb_name, gb_manufacturer, gb_model, gb_owner, gb_civil_code, gb_block, gb_address, " +
"gb_parental, gb_parent_id, gb_safety_way, gb_register_way, gb_cert_num, gb_certifiable, " +
"gb_err_code, gb_end_time, gb_secrecy, gb_ip_address, gb_port, gb_password, gb_status, " +
"gb_longitude, gb_latitude, gb_ptz_type, gb_position_type, gb_room_type, gb_use_type, " +
"gb_supply_light_type, gb_direction_type, gb_resolution, gb_business_group_id, " +
"gb_download_speed, gb_svc_space_support_mod, gb_svc_time_support_mode, enable_broadcast" +
") VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.gbDeviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.createTime}, #{item.updateTime}," +
"#{item.gbName}, #{item.gbManufacturer}, #{item.gbModel}, #{item.gbOwner}, #{item.gbCivilCode}, #{item.gbBlock}, #{item.gbAddress}," +
"#{item.gbParental}, #{item.gbParentId}, #{item.gbSafetyWay}, #{item.gbRegisterWay}, #{item.gbCertNum}, #{item.gbCertifiable}," +
"#{item.gbErrCode}, #{item.gbEndTime}, #{item.gbSecrecy}, #{item.gbIpAddress}, #{item.gbPort}, #{item.gbPassword}, #{item.gbStatus}," +
"#{item.gbLongitude}, #{item.gbLatitude}, #{item.gbPtzType}, #{item.gbPositionType}, #{item.gbRoomType}, #{item.gbUseType}," +
"#{item.gbSupplyLightType}, #{item.gbDirectionType}, #{item.gbResolution}, #{item.gbBusinessGroupId}," +
"#{item.gbDownloadSpeed}, #{item.gbSvcSpaceSupportMod}, #{item.gbSvcTimeSupportMode}, #{item.enableBroadcast})" +
"</foreach>" +
"ON DUPLICATE KEY UPDATE " +
"gb_name=VALUES(gb_name), gb_manufacturer=VALUES(gb_manufacturer), gb_model=VALUES(gb_model), " +
"gb_owner=VALUES(gb_owner), gb_civil_code=VALUES(gb_civil_code), gb_block=VALUES(gb_block), " +
"gb_address=VALUES(gb_address), gb_parental=VALUES(gb_parental), gb_parent_id=VALUES(gb_parent_id), " +
"gb_safety_way=VALUES(gb_safety_way), gb_register_way=VALUES(gb_register_way), " +
"gb_cert_num=VALUES(gb_cert_num), gb_certifiable=VALUES(gb_certifiable), " +
"gb_err_code=VALUES(gb_err_code), gb_end_time=VALUES(gb_end_time), gb_secrecy=VALUES(gb_secrecy), " +
"gb_ip_address=VALUES(gb_ip_address), gb_port=VALUES(gb_port), gb_password=VALUES(gb_password), " +
"gb_status=VALUES(gb_status), gb_longitude=VALUES(gb_longitude), gb_latitude=VALUES(gb_latitude), " +
"gb_ptz_type=VALUES(gb_ptz_type), gb_position_type=VALUES(gb_position_type), " +
"gb_room_type=VALUES(gb_room_type), gb_use_type=VALUES(gb_use_type), " +
"gb_supply_light_type=VALUES(gb_supply_light_type), gb_direction_type=VALUES(gb_direction_type), " +
"gb_resolution=VALUES(gb_resolution), gb_business_group_id=VALUES(gb_business_group_id), " +
"gb_download_speed=VALUES(gb_download_speed), " +
"gb_svc_space_support_mod=VALUES(gb_svc_space_support_mod), " +
"gb_svc_time_support_mode=VALUES(gb_svc_time_support_mode), " +
"enable_broadcast=VALUES(enable_broadcast), update_time=VALUES(update_time)" +
"</script>";
}
}

View File

@ -1,46 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class PostgresStyleCommonChannelUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"INSERT INTO wvp_device_channel (" +
"gb_device_id, data_type, data_device_id, create_time, update_time, " +
"gb_name, gb_manufacturer, gb_model, gb_owner, gb_civil_code, gb_block, gb_address, " +
"gb_parental, gb_parent_id, gb_safety_way, gb_register_way, gb_cert_num, gb_certifiable, " +
"gb_err_code, gb_end_time, gb_secrecy, gb_ip_address, gb_port, gb_password, gb_status, " +
"gb_longitude, gb_latitude, gb_ptz_type, gb_position_type, gb_room_type, gb_use_type, " +
"gb_supply_light_type, gb_direction_type, gb_resolution, gb_business_group_id, " +
"gb_download_speed, gb_svc_space_support_mod, gb_svc_time_support_mode, enable_broadcast" +
") VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.gbDeviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.createTime}, #{item.updateTime}," +
"#{item.gbName}, #{item.gbManufacturer}, #{item.gbModel}, #{item.gbOwner}, #{item.gbCivilCode}, #{item.gbBlock}, #{item.gbAddress}," +
"#{item.gbParental}, #{item.gbParentId}, #{item.gbSafetyWay}, #{item.gbRegisterWay}, #{item.gbCertNum}, #{item.gbCertifiable}," +
"#{item.gbErrCode}, #{item.gbEndTime}, #{item.gbSecrecy}, #{item.gbIpAddress}, #{item.gbPort}, #{item.gbPassword}, #{item.gbStatus}," +
"#{item.gbLongitude}, #{item.gbLatitude}, #{item.gbPtzType}, #{item.gbPositionType}, #{item.gbRoomType}, #{item.gbUseType}," +
"#{item.gbSupplyLightType}, #{item.gbDirectionType}, #{item.gbResolution}, #{item.gbBusinessGroupId}," +
"#{item.gbDownloadSpeed}, #{item.gbSvcSpaceSupportMod}, #{item.gbSvcTimeSupportMode}, #{item.enableBroadcast})" +
"</foreach>" +
"ON CONFLICT (gb_device_id) DO UPDATE SET " +
"gb_name=EXCLUDED.gb_name, gb_manufacturer=EXCLUDED.gb_manufacturer, gb_model=EXCLUDED.gb_model, " +
"gb_owner=EXCLUDED.gb_owner, gb_civil_code=EXCLUDED.gb_civil_code, gb_block=EXCLUDED.gb_block, " +
"gb_address=EXCLUDED.gb_address, gb_parental=EXCLUDED.gb_parental, gb_parent_id=EXCLUDED.gb_parent_id, " +
"gb_safety_way=EXCLUDED.gb_safety_way, gb_register_way=EXCLUDED.gb_register_way, " +
"gb_cert_num=EXCLUDED.gb_cert_num, gb_certifiable=EXCLUDED.gb_certifiable, " +
"gb_err_code=EXCLUDED.gb_err_code, gb_end_time=EXCLUDED.gb_end_time, gb_secrecy=EXCLUDED.gb_secrecy, " +
"gb_ip_address=EXCLUDED.gb_ip_address, gb_port=EXCLUDED.gb_port, gb_password=EXCLUDED.gb_password, " +
"gb_status=EXCLUDED.gb_status, gb_longitude=EXCLUDED.gb_longitude, gb_latitude=EXCLUDED.gb_latitude, " +
"gb_ptz_type=EXCLUDED.gb_ptz_type, gb_position_type=EXCLUDED.gb_position_type, " +
"gb_room_type=EXCLUDED.gb_room_type, gb_use_type=EXCLUDED.gb_use_type, " +
"gb_supply_light_type=EXCLUDED.gb_supply_light_type, gb_direction_type=EXCLUDED.gb_direction_type, " +
"gb_resolution=EXCLUDED.gb_resolution, gb_business_group_id=EXCLUDED.gb_business_group_id, " +
"gb_download_speed=EXCLUDED.gb_download_speed, " +
"gb_svc_space_support_mod=EXCLUDED.gb_svc_space_support_mod, " +
"gb_svc_time_support_mode=EXCLUDED.gb_svc_time_support_mode, " +
"enable_broadcast=EXCLUDED.enable_broadcast, update_time=EXCLUDED.update_time" +
"</script>";
}
}

View File

@ -1,43 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import java.util.Map;
public class PostgresStyleUpsertProvider {
public String batchUpsert(Map<String, Object> params) {
return "<script>" +
"INSERT INTO wvp_device_channel (" +
"device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " +
"address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " +
"ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " +
"supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " +
"svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type" +
") VALUES " +
"<foreach collection='channels' item='item' separator=','>" +
"(#{item.deviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " +
"#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " +
"#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " +
"#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," +
" #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType})" +
"</foreach>" +
"ON CONFLICT (data_device_id, device_id) DO UPDATE SET " +
"name=EXCLUDED.name, manufacturer=EXCLUDED.manufacturer, model=EXCLUDED.model, owner=EXCLUDED.owner, " +
"civil_code=EXCLUDED.civil_code, block=EXCLUDED.block, address=EXCLUDED.address, " +
"parental=EXCLUDED.parental, parent_id=EXCLUDED.parent_id, safety_way=EXCLUDED.safety_way, " +
"register_way=EXCLUDED.register_way, cert_num=EXCLUDED.cert_num, certifiable=EXCLUDED.certifiable, " +
"err_code=EXCLUDED.err_code, end_time=EXCLUDED.end_time, secrecy=EXCLUDED.secrecy, " +
"ip_address=EXCLUDED.ip_address, port=EXCLUDED.port, password=EXCLUDED.password, " +
"status=EXCLUDED.status, longitude=EXCLUDED.longitude, latitude=EXCLUDED.latitude, " +
"ptz_type=EXCLUDED.ptz_type, position_type=EXCLUDED.position_type, room_type=EXCLUDED.room_type, " +
"use_type=EXCLUDED.use_type, supply_light_type=EXCLUDED.supply_light_type, " +
"direction_type=EXCLUDED.direction_type, resolution=EXCLUDED.resolution, " +
"business_group_id=EXCLUDED.business_group_id, download_speed=EXCLUDED.download_speed, " +
"svc_space_support_mod=EXCLUDED.svc_space_support_mod, " +
"svc_time_support_mode=EXCLUDED.svc_time_support_mode, " +
"update_time=EXCLUDED.update_time, sub_count=EXCLUDED.sub_count, " +
"stream_id=EXCLUDED.stream_id, has_audio=EXCLUDED.has_audio, " +
"gps_time=EXCLUDED.gps_time, stream_identification=EXCLUDED.stream_identification, " +
"channel_type=EXCLUDED.channel_type" +
"</script>";
}
}

View File

@ -114,58 +114,75 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
if (CollectionUtils.isEmpty(channels)) {
return 0;
}
// 入参去重
Set<String> dedupSet = new HashSet<>();
List<DeviceChannel> uniqueChannels = new ArrayList<>();
for (DeviceChannel ch : channels) {
if (dedupSet.add(ch.getDeviceId())) {
uniqueChannels.add(ch);
}
}
List<DeviceChannel> upsertChannels = new ArrayList<>();
List<DeviceChannel> addChannels = new ArrayList<>();
List<DeviceChannel> updateChannels = new ArrayList<>();
HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
int result = 0;
List<DeviceChannel> channelList = channelMapper.queryChannelsByDeviceDbId(device.getId());
if (channelList.isEmpty()) {
String now = DateUtil.getNow();
for (DeviceChannel channel : uniqueChannels) {
for (DeviceChannel channel : channels) {
channel.setDataDeviceId(device.getId());
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
channel.setStreamId(inviteInfo.getStreamInfo().getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
channel.setCreateTime(now);
upsertChannels.add(channel);
addChannels.add(channel);
}
}else {
HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
for (DeviceChannel deviceChannel : channelList) {
channelsInStore.put(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId(), deviceChannel);
}
String now = DateUtil.getNow();
for (DeviceChannel channel : uniqueChannels) {
for (DeviceChannel channel : channels) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
channel.setStreamId(inviteInfo.getStreamInfo().getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
DeviceChannel deviceChannelInDb = channelsInStore.get(channel.getDataDeviceId() + channel.getDeviceId());
if ( deviceChannelInDb != null) {
channel.setId(deviceChannelInDb.getId());
channel.setCreateTime(deviceChannelInDb.getCreateTime());
channel.setUpdateTime(now);
updateChannels.add(channel);
}else {
channel.setCreateTime(now);
}
upsertChannels.add(channel);
channel.setUpdateTime(now);
addChannels.add(channel);
}
}
}
Set<String> channelSet = new HashSet<>();
// 滤重
List<DeviceChannel> addChannelList = new ArrayList<>();
List<DeviceChannel> updateChannelList = new ArrayList<>();
addChannels.forEach(channel -> {
if (channelSet.add(channel.getDeviceId())) {
addChannelList.add(channel);
}
});
channelSet.clear();
updateChannels.forEach(channel -> {
if (channelSet.add(channel.getDeviceId())) {
updateChannelList.add(channel);
}
});
int limitCount = 500;
if (!upsertChannels.isEmpty()) {
for (int i = 0; i < upsertChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, upsertChannels.size());
List<DeviceChannel> batchList = upsertChannels.subList(i, end);
result += channelMapper.batchUpsert(batchList);
if (!addChannelList.isEmpty()) {
for (int i = 0; i < addChannelList.size(); i += limitCount) {
int end = Math.min(i + limitCount, addChannelList.size());
List<DeviceChannel> batchList = addChannelList.subList(i, end);
result += channelMapper.batchAdd(batchList);
}
}
if (!updateChannelList.isEmpty()) {
for (int i = 0; i < updateChannelList.size(); i += limitCount) {
int end = Math.min(i + limitCount, updateChannelList.size());
List<DeviceChannel> batchList = updateChannelList.subList(i, end);
result += channelMapper.batchUpdate(batchList);
}
}
return result;
@ -368,26 +385,21 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
allChannelMap.put(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId(), deviceChannel);
}
}
// 入参去重
Set<String> dedupSet = new HashSet<>();
List<DeviceChannel> uniqueChannels = new ArrayList<>();
for (DeviceChannel ch : deviceChannelList) {
if (dedupSet.add(ch.getDeviceId())) {
uniqueChannels.add(ch);
}
}
// 数据去重
List<DeviceChannel> channels = new ArrayList<>();
List<DeviceChannel> upsertChannels = new ArrayList<>();
List<DeviceChannel> updateChannels = new ArrayList<>();
List<DeviceChannel> addChannels = new ArrayList<>();
List<DeviceChannel> deleteChannels = new ArrayList<>();
StringBuilder stringBuilder = new StringBuilder();
Map<String, Integer> subContMap = new HashMap<>();
for (DeviceChannel deviceChannel : uniqueChannels) {
for (DeviceChannel deviceChannel : deviceChannelList) {
DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId());
if (channelInDb != null) {
deviceChannel.setStreamId(channelInDb.getStreamId());
deviceChannel.setHasAudio(channelInDb.isHasAudio());
deviceChannel.setId(channelInDb.getId());
deviceChannel.setCreateTime(channelInDb.getCreateTime());
if (channelInDb.getStatus() != null && !channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){
List<Platform> platformList = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getDeviceId());
if (!CollectionUtils.isEmpty(platformList)){
@ -397,12 +409,14 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
}
deviceChannel.setUpdateTime(DateUtil.getNow());
updateChannels.add(deviceChannel);
}else {
deviceChannel.setCreateTime(DateUtil.getNow());
deviceChannel.setUpdateTime(DateUtil.getNow());
addChannels.add(deviceChannel);
}
allChannelMap.remove(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId());
upsertChannels.add(deviceChannel);
channels.add(deviceChannel);
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
if (subContMap.get(deviceChannel.getParentId()) == null) {
subContMap.put(deviceChannel.getParentId(), 1);
@ -413,8 +427,8 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
}
deleteChannels.addAll(allChannelMap.values());
for (DeviceChannel channel : upsertChannels) {
if (!channels.isEmpty()) {
for (DeviceChannel channel : channels) {
if (subContMap.get(channel.getDeviceId()) != null){
Integer count = subContMap.get(channel.getDeviceId());
if (count > 0) {
@ -423,19 +437,32 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
}
}
}
if(CollectionUtils.isEmpty(upsertChannels)){
if (stringBuilder.length() > 0) {
log.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
}
if(CollectionUtils.isEmpty(channels)){
log.info("通道重设,数据为空={}" , deviceChannelList);
return false;
}
int limitCount = 500;
if (!upsertChannels.isEmpty()) {
for (int i = 0; i < upsertChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, upsertChannels.size());
List<DeviceChannel> batchList = upsertChannels.subList(i, end);
channelMapper.batchUpsert(batchList);
if (!addChannels.isEmpty()) {
for (int i = 0; i < addChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, addChannels.size());
List<DeviceChannel> batchList = addChannels.subList(i, end);
channelMapper.batchAdd(batchList);
}
}
if (!updateChannels.isEmpty()) {
for (int i = 0; i < updateChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, updateChannels.size());
List<DeviceChannel> batchList = updateChannels.subList(i, end);
channelMapper.batchUpdate(batchList);
}
// 不对收到的通道做比较已确定是否真的发生变化所以不发送更新通知
}
if (!deleteChannels.isEmpty()) {
try {
// 这些通道可能关联了上级平台需要删除同时发送消息

View File

@ -266,6 +266,7 @@ public class DeviceServiceImpl implements IDeviceService {
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrcToRelease());
receiveRtpServerService.closeRTPServerByMediaServerId(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}

View File

@ -325,13 +325,13 @@ public class GbChannelServiceImpl implements IGbChannelService {
log.warn("[新增多个通道] 通道数量为0更新失败");
return;
}
// 批量保存使用UPSERT防重复
// 批量保存
int limitCount = 1000;
int result = 0;
for (int i = 0; i < commonGBChannels.size(); i += limitCount) {
int end = Math.min(i + limitCount, commonGBChannels.size());
List<CommonGBChannel> batchList = commonGBChannels.subList(i, end);
result += commonGBChannelMapper.batchUpsert(batchList);
result += commonGBChannelMapper.batchAdd(batchList);
}
try {
// 发送catalog

View File

@ -128,6 +128,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
sendRtpServerService.delete(sendRtpItem);
if (mediaServerItem != null) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease());
boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (stopResult) {
Platform platform = queryPlatformByServerGBId(sendRtpItem.getTargetId());
@ -334,6 +335,7 @@ public class PlatformServiceImpl implements IPlatformService {
if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp()) && sendRtpItem.isSendToPlatform()) {
Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getTargetId());
CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease());
try {
commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
} catch (SipException | InvalidArgumentException | ParseException e) {
@ -520,6 +522,7 @@ public class PlatformServiceImpl implements IPlatformService {
List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryForPlatform(platformId);
if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease());
sendRtpServerService.delete(sendRtpItem);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
@ -623,6 +626,7 @@ public class PlatformServiceImpl implements IPlatformService {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), data.getSsrcInfo().getSsrcToRelease());
receiveRtpServerService.closeRTPServer(mediaServerItem, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream());
sessionManager.removeByStream(data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream());
}
@ -698,6 +702,7 @@ public class PlatformServiceImpl implements IPlatformService {
// ssrc检验
// 更新ssrc
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
@ -706,6 +711,8 @@ public class PlatformServiceImpl implements IPlatformService {
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
} finally {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
@ -717,6 +724,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
}else {
ssrcInfo.setSsrc(ssrcInResponse);
updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) {
@ -730,7 +738,9 @@ public class PlatformServiceImpl implements IPlatformService {
inviteStreamService.updateInviteInfo(inviteInfo);
}
}else {
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
ssrcInfo.setSsrc(ssrcInResponse);
updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) {
@ -751,6 +761,7 @@ public class PlatformServiceImpl implements IPlatformService {
if (ssrcTransaction == null) {
return;
}
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getApp(), inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
@ -759,6 +770,7 @@ public class PlatformServiceImpl implements IPlatformService {
ssrcTransaction.setApp(ssrcInfo.getApp());
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setAllocatedSsrc(null);
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse()));
ssrcTransaction.setType(inviteSessionType);
@ -769,6 +781,25 @@ public class PlatformServiceImpl implements IPlatformService {
}
}
private void releaseAllocatedSsrc(MediaServer mediaServerItem, SSRCInfo ssrcInfo) {
if (ssrcInfo == null || ssrcInfo.getAllocatedSsrc() == null) {
return;
}
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getAllocatedSsrc());
ssrcInfo.setAllocatedSsrc(null);
}
private void updateSsrcTransaction(String app, String stream, String ssrc, String allocatedSsrc) {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
if (ssrcTransaction == null) {
return;
}
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setAllocatedSsrc(allocatedSsrc);
sessionManager.put(ssrcTransaction);
}
private void tcpActiveHandler(Platform platform, CommonGBChannel channel, String contentString,
MediaServer mediaServerItem, boolean ssrcCheck,
SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
@ -799,6 +830,8 @@ public class PlatformServiceImpl implements IPlatformService {
} catch (SdpException e) {
log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
@ -822,6 +855,8 @@ public class PlatformServiceImpl implements IPlatformService {
receiveRtpServerService.closeRTPServer(mediaServerItem, app, stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream);
if (inviteInfo != null) {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrcToRelease());
inviteStreamService.removeInviteInfo(inviteInfo);
}
sessionManager.removeByStream(app, stream);

View File

@ -13,7 +13,6 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -110,9 +109,6 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private SendSsrcFactory sendSsrcFactory;
@Autowired
private IPlatformService platformService;
@ -339,6 +335,9 @@ public class PlayServiceImpl implements IPlayService {
InviteInfo inviteInfoInCatch = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
if (inviteInfoInCatch != null ) {
if (inviteInfoInCatch.getStreamInfo() == null) {
// 释放生成的ssrc使用上一次申请的
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, channel.getId(), null, callback);
log.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId({}): {}", device.getDeviceId(), channel.getDeviceId(), channel.getId());
@ -481,23 +480,25 @@ public class PlayServiceImpl implements IPlayService {
private void talk(MediaServer mediaServerItem, Device device, DeviceChannel channel, String stream,
SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
String ySsrc = ssrcFactory.getPlaySsrc(mediaServerItem);
String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
if (ySsrc == null) {
if (playSsrc == null) {
audioEvent.call("ssrc已经用尽");
return;
}
String sendSsrc = sendSsrcFactory.getSendSsrc("0");
SendRtpInfo sendRtpInfo;
try {
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, sendSsrc, device.getDeviceId(), MediaStreamUtil.GB28181_TALK, stream,
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), MediaStreamUtil.GB28181_TALK, stream,
channel.getId(), true, false);
if (sendRtpInfo == null) {
ssrcFactory.releaseSsrc(mediaServerItem.getId(), playSsrc);
audioEvent.call("获取发流端口失败");
return;
}
sendRtpInfo.setAllocatedSsrc(playSsrc);
sendRtpInfo.setPlayType(InviteStreamType.TALK);
}catch (PlayException e) {
ssrcFactory.releaseSsrc(mediaServerItem.getId(), playSsrc);
log.info("[语音对讲]开始 获取发流端口失败 deviceId: {}, channelId: {},", device.getDeviceId(), channel.getDeviceId());
return;
}
@ -524,6 +525,7 @@ public class PlayServiceImpl implements IPlayService {
log.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
}
}, userSetting.getPlayTimeout());
@ -532,6 +534,7 @@ public class PlayServiceImpl implements IPlayService {
Integer localPort = mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpInfo, userSetting.getPlayTimeout() * 1000);
if (localPort == null || localPort <= 0) {
timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
return;
}
@ -540,6 +543,7 @@ public class PlayServiceImpl implements IPlayService {
receiveRtpServerService.addAuthenticateInfoForGb28181Talk(mediaServerItem, sendRtpInfo.getStream());
}catch (ControllerException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId());
audioEvent.call("失败, " + e.getMessage());
// 查看是否已经建立了通道存在则发送bye
@ -549,7 +553,7 @@ public class PlayServiceImpl implements IPlayService {
// 查看设备是否已经在推流
try {
cmder.talkStreamCmd(mediaServerItem, sendRtpInfo, ySsrc, device, channel, callId, (hookData) -> {
cmder.talkStreamCmd(mediaServerItem, sendRtpInfo, device, channel, callId, (hookData) -> {
log.info("[语音对讲] 流已生成, 开始推流: " + hookData);
dynamicTask.stop(timeOutTaskKey);
// TODO 暂不做处理
@ -584,6 +588,8 @@ public class PlayServiceImpl implements IPlayService {
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
errorEvent.response(event);
}, userSetting.getPlayTimeout().longValue());
@ -592,6 +598,9 @@ public class PlayServiceImpl implements IPlayService {
log.error("[命令发送失败] 对讲消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey);
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
@ -866,6 +875,7 @@ public class PlayServiceImpl implements IPlayService {
// ssrc检验
// 更新ssrc
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
@ -875,6 +885,8 @@ public class PlayServiceImpl implements IPlayService {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
}
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
@ -885,6 +897,7 @@ public class PlayServiceImpl implements IPlayService {
}else {
ssrcInfo.setSsrc(ssrcInResponse);
updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
@ -897,7 +910,9 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.updateInviteInfo(inviteInfo);
}
} else {
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
ssrcInfo.setSsrc(ssrcInResponse);
updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
@ -917,12 +932,14 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcTransaction == null) {
return;
}
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(MediaStreamUtil.RTP_APP, inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
ssrcTransaction.setDeviceId(device.getDeviceId());
ssrcTransaction.setChannelId(ssrcTransaction.getChannelId());
ssrcTransaction.setCallId(ssrcTransaction.getCallId());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setAllocatedSsrc(null);
ssrcTransaction.setApp(MediaStreamUtil.RTP_APP);
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
@ -935,6 +952,24 @@ public class PlayServiceImpl implements IPlayService {
}
}
private void releaseAllocatedSsrc(MediaServer mediaServerItem, SSRCInfo ssrcInfo) {
if (ssrcInfo == null || ssrcInfo.getAllocatedSsrc() == null) {
return;
}
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getAllocatedSsrc());
ssrcInfo.setAllocatedSsrc(null);
}
private void updateSsrcTransaction(String app, String stream, String ssrc, String allocatedSsrc) {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
if (ssrcTransaction == null) {
return;
}
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setAllocatedSsrc(allocatedSsrc);
sessionManager.put(ssrcTransaction);
}
@Override
public void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) {
@ -1568,6 +1603,8 @@ public class PlayServiceImpl implements IPlayService {
mediaServerService.stopSendRtp(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream(), sendRtpInfo.getSsrc());
}
ssrcFactory.releaseSsrc(mediaServerId, sendRtpInfo.getSsrcToRelease());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
if (ssrcTransaction != null) {
try {

View File

@ -19,7 +19,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@Component
@ -39,16 +38,6 @@ public class CatalogDataManager{
private final Map<String, CatalogData> dataMap = new ConcurrentHashMap<>();
private final Map<String, ReentrantLock> deviceWriteLocks = new ConcurrentHashMap<>();
public ReentrantLock getDeviceWriteLock(String deviceId) {
return deviceWriteLocks.computeIfAbsent(deviceId, k -> new ReentrantLock());
}
public void removeDeviceWriteLock(String deviceId) {
deviceWriteLocks.remove(deviceId);
}
private final String key = "VMP_CATALOG_DATA";
public String buildMapKey(String deviceId, int sn ) {
@ -251,14 +240,9 @@ public class CatalogDataManager{
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
if ( catalogData.getTime().isBefore(instantBefore5S)) {
String deviceId = catalogData.getDevice().getDeviceId();
ReentrantLock lock = getDeviceWriteLock(deviceId);
if (!lock.tryLock()) {
// saveData() 正在执行跳过本次等下一个5s周期
continue;
}
try {
int sn = catalogData.getSn();
List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId, sn);
try {
if (catalogData.getTotal() == deviceChannelList.size()) {
deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList);
}else {
@ -272,23 +256,16 @@ public class CatalogDataManager{
if (groupList != null && !groupList.isEmpty()) {
groupService.batchAdd(groupList);
}
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "";
catalogData.setErrorMsg(errorMsg);
}catch (Exception e) {
log.error("[国标通道同步] 入库失败: ", e);
} finally {
lock.unlock();
}
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "";
catalogData.setErrorMsg(errorMsg);
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
}
}else {
if (catalogData.getTime().isBefore(instantBefore30S)) {
String deviceId = catalogData.getDevice().getDeviceId();
dataMap.remove(dataKey);
// 清理可能残留的设备锁
if (deviceWriteLocks.containsKey(deviceId)) {
deviceWriteLocks.remove(deviceId);
}
Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
for (String deleteKey : redisKeysForChannel) {

View File

@ -1,42 +1,35 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.ZLMResult;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.BitSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.Set;
/**
* ssrc使用
*/
@Slf4j
@Component
public class SSRCFactory {
private final ConcurrentHashMap<String, BitSet> usedMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Object> lockMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "ssrc-rebuild");
t.setDaemon(true);
return t;
});
/**
* 播流最大并发个数
*/
private static final Integer MAX_STREAM_COUNT = 10000;
/**
* 播流最大并发个数
*/
private static final String SSRC_INFO_KEY = "VMP_SSRC_INFO_";
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private IMediaServerService mediaServerService;
private StringRedisTemplate redisTemplate;
@Autowired
private SipConfig sipConfig;
@ -44,95 +37,107 @@ public class SSRCFactory {
@Autowired
private UserSetting userSetting;
private String domainPart;
@PostConstruct
public void init() {
String sipDomain = sipConfig.getDomain();
domainPart = sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain;
scheduler.scheduleAtFixedRate(this::rebuild, 10, 30, TimeUnit.SECONDS);
public void initMediaServerSSRC(String mediaServerId, Set<String> usedSet) {
String ssrcPrefix = getSsrcPrefix();
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
List<String> ssrcList = new ArrayList<>();
for (int i = 1; i < MAX_STREAM_COUNT; i++) {
String ssrc = String.format("%s%04d", ssrcPrefix, i);
if (null == usedSet || !usedSet.contains(ssrc)) {
ssrcList.add(ssrc);
}
}
if (redisTemplate.opsForSet().size(redisKey) != null) {
redisTemplate.delete(redisKey);
}
redisTemplate.opsForSet().add(redisKey, ssrcList.toArray(new String[0]));
}
/**
* 获取视频预览的SSRC值,第一位固定为0
*
* @return ssrc
*/
public String getPlaySsrc(String mediaServerId) {
String suffix = allocate(mediaServerId);
return suffix != null ? "0" + suffix : null;
return "0" + getSN(mediaServerId);
}
/**
* 获取录像回放的SSRC值,第一位固定为1
*/
public String getPlayBackSsrc(String mediaServerId) {
String suffix = allocate(mediaServerId);
return suffix != null ? "1" + suffix : null;
return "1" + getSN(mediaServerId);
}
public String getPlaySsrc(MediaServer mediaServer) {
if (mediaServer.isRtpEnable() && userSetting.getSsrcRandom()) {
return "0" + domainPart + String.format("%04d", ThreadLocalRandom.current().nextInt(10000));
/**
* 释放ssrc主要用完的ssrc一定要释放否则会耗尽
*
* @param ssrc 需要重置的ssrc
*/
public void releaseSsrc(String mediaServerId, String ssrc) {
if (ssrc == null) {
return;
}
return getPlaySsrc(mediaServer.getId());
if (!isFactorySsrc(ssrc)) {
log.warn("[释放 SSRC] 忽略非SSRC池分配的值: {}", ssrc);
return;
}
String sn = ssrc.substring(1);
log.debug("[释放 SSRC] SSRC:{} -> SN: {}", ssrc, sn);
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
redisTemplate.opsForSet().add(redisKey, sn);
}
public String getPlayBackSsrc(MediaServer mediaServer) {
if (mediaServer.isRtpEnable() && userSetting.getSsrcRandom()) {
return "1" + domainPart + String.format("%04d", ThreadLocalRandom.current().nextInt(10000));
}
return getPlayBackSsrc(mediaServer.getId());
}
private String allocate(String mediaServerId) {
synchronized (lockMap.computeIfAbsent(mediaServerId, k -> new Object())) {
BitSet bits = usedMap.computeIfAbsent(mediaServerId, k -> new BitSet(10000));
int start = ThreadLocalRandom.current().nextInt(10000);
int index = start;
do {
if (!bits.get(index)) {
bits.set(index);
return domainPart + String.format("%04d", index);
}
index = (index + 1) % 10000;
} while (index != start);
log.warn("[SSRC] 媒体节点 {} 的SSRC已用尽", mediaServerId);
return null;
}
}
void rebuild() {
List<MediaServer> servers = mediaServerService.getAll();
for (MediaServer server : servers) {
if (server.isRtpEnable() && userSetting.getSsrcRandom()) {
continue;
}
synchronized (lockMap.computeIfAbsent(server.getId(), k -> new Object())) {
BitSet bits = new BitSet(10000);
int count = 0;
try {
ZLMResult<?> result = zlmresTfulUtils.getMediaList(server, null, null, "rtsp", null);
if (result != null && result.getCode() == 0 && result.getData() != null) {
List<JSONObject> list = (List<JSONObject>) result.getData();
for (JSONObject obj : list) {
if (obj.getIntValue("originType") != 3) continue;
String originUrl = obj.getString("originUrl");
if (originUrl == null) continue;
int idx = originUrl.lastIndexOf("/rtp/");
if (idx == -1) continue;
try {
int suffix = (int) (Long.parseLong(originUrl.substring(idx + 5), 16) % 10000);
bits.set(suffix);
count++;
} catch (NumberFormatException ignored) {
}
}
}
} catch (Exception e) {
log.warn("[SSRC重建] 查询媒体节点 {} 失败: {}", server.getId(), e.getMessage());
}
usedMap.put(server.getId(), bits);
if (count > 8000) {
log.info("[SSRC重建] 媒体节点 {} 的SSRC使用率已超过80%,请注意扩展服务提升性能", server.getId());
/**
* 获取后四位数SN,随机数
*/
private String getSN(String mediaServerId) {
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
Long size = redisTemplate.opsForSet().size(redisKey);
if (size == null || size == 0) {
log.info("[获取 SSRC 失败] redisKey {}", redisKey);
throw new RuntimeException("ssrc已经用完");
} else {
if (log.isDebugEnabled()) {
log.debug("[SSRC重建] 节点 {} 已占用 {} 个SSRC", server.getId(), count);
// 在集合中移除并返回一个随机成员
return redisTemplate.opsForSet().pop(redisKey);
}
}
/**
* 重置一个流媒体服务的所有ssrc
*
* @param mediaServerId 流媒体服务ID
*/
public void reset(String mediaServerId) {
this.initMediaServerSSRC(mediaServerId, null);
}
/**
* 是否已经存在了某个MediaServer的SSRC信息
*
* @param mediaServerId 流媒体服务ID
*/
public boolean hasMediaServerSSRC(String mediaServerId) {
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
return Boolean.TRUE.equals(redisTemplate.hasKey(redisKey));
}
private String getSsrcPrefix() {
String sipDomain = sipConfig.getDomain();
return sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain;
}
private boolean isFactorySsrc(String ssrc) {
if (ssrc.length() < 2) {
return false;
}
String sn = ssrc.substring(1);
String ssrcPrefix = getSsrcPrefix();
return sn.length() == ssrcPrefix.length() + 4 && sn.startsWith(ssrcPrefix);
}
}

View File

@ -1,29 +0,0 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.conf.SipConfig;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadLocalRandom;
@Slf4j
@Component
public class SendSsrcFactory {
@Autowired
private SipConfig sipConfig;
private String domainPart;
@PostConstruct
public void init() {
String sipDomain = sipConfig.getDomain();
domainPart = sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain;
}
public String getSendSsrc(String prefix) {
return prefix + domainPart + String.format("%04d", ThreadLocalRandom.current().nextInt(10000));
}
}

View File

@ -94,7 +94,7 @@ public interface ISIPCommander {
*/
void streamByeCmd(Device device, String channelId, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, String ySsrc, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;

View File

@ -281,6 +281,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease());
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
@ -289,6 +290,7 @@ public class SIPCommander implements ISIPCommander {
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(),
callId,ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
InviteSessionType.PLAY);
ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(e);
}, timeout);
@ -386,6 +388,7 @@ public class SIPCommander implements ISIPCommander {
channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),
device.getTransport()).getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(),
mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(event);
}, timeout);
@ -479,13 +482,14 @@ public class SIPCommander implements ISIPCommander {
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(),
response.getCallIdHeader().getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrc,
mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(event);
}, timeout);
}
@Override
public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, String ySsrc, Device device, DeviceChannel channel,
public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channel,
String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException {
@ -531,20 +535,22 @@ public class SIPCommander implements ISIPCommander {
content.append("a=sendrecv\r\n");
content.append("a=rtpmap:8 PCMA/8000\r\n");
content.append("y=" + ySsrc + "\r\n");//ssrc
content.append("y=" + sendRtpItem.getSsrc() + "\r\n");//ssrc
// f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
content.append("f=v/////a/1/8/1" + "\r\n");
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(),
SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ySsrc, callIdHeader);
SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
sessionManager.removeByStream(sendRtpItem.getApp(), sendRtpItem.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrcToRelease());
errorEvent.response(e);
}), e -> {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), MediaStreamUtil.GB28181_TALK,sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
ssrcTransaction.setAllocatedSsrc(sendRtpItem.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(e);
}, timeout);

View File

@ -642,6 +642,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
String mediaServerId = sendRtpItem.getMediaServerId();
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrcToRelease());
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
}
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel);
@ -743,12 +744,14 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServer.getId(), ssrcInfo.getSsrcToRelease());
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForPlatform(platform.getServerGBId(), channel.getGbId(),
callIdHeader.getCallId(), ssrcInfo.getApp(), stream, ssrcInfo.getSsrc(), mediaServer.getId(), response, InviteSessionType.BROADCAST);
ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(e);
});

View File

@ -131,6 +131,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
sendRtpServerService.deleteByCallId(callIdHeader.getCallId());
if (mediaServer != null) {
mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrcToRelease());
}
}
}
}else {
@ -140,6 +143,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
sendRtpServerService.delete(sendRtpItem);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrcToRelease());
}
}
if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@ -245,6 +251,11 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
break;
}
// 释放ssrc
MediaServer mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrcToRelease());
}
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}
}

View File

@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -39,6 +39,7 @@ import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.security.SecureRandom;
import java.text.ParseException;
import java.util.List;
import java.util.Vector;
@ -102,7 +103,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private UserSetting userSetting;
@Autowired
private SendSsrcFactory sendSsrcFactory;
private SSRCFactory ssrcFactory;
@Override
@ -174,14 +175,22 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 点播成功 TODO 可以在此处检测cancel命令是否存在存在则不发送
if (userSetting.getUseCustomSsrcForParentInvite()) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
finalInviteInfo.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) ? "0" : "1"));
MediaServer mediaServer = mediaServerService.getOne(streamInfo.getMediaServer().getId());
if (mediaServer != null) {
String ssrc = "Play".equalsIgnoreCase(finalInviteInfo.getSessionName())
? ssrcFactory.getPlaySsrc(streamInfo.getMediaServer().getId())
: ssrcFactory.getPlayBackSsrc(streamInfo.getMediaServer().getId());
finalInviteInfo.setSsrc(ssrc);
finalInviteInfo.setAllocatedSsrc(ssrc);
finalInviteInfo.setAllocatedSsrcMediaServerId(streamInfo.getMediaServer().getId());
}
}
// 构建sendRTP内容
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(),
finalInviteInfo.getIp(), finalInviteInfo.getPort(), finalInviteInfo.getSsrc(), platform.getServerGBId(),
streamInfo.getApp(), streamInfo.getStream(),
channel.getGbId(), finalInviteInfo.isTcp(), platform.isRtcp());
sendRtpItem.setAllocatedSsrc(finalInviteInfo.getAllocatedSsrc());
if (finalInviteInfo.isTcp() && finalInviteInfo.isTcpActive()) {
sendRtpItem.setTcpActive(true);
}
@ -199,6 +208,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask.startDelay(finalInviteInfo.getCallId(), () -> {
log.info("[Ack ] 等待超时, {}/{}", finalInviteInfo.getCallId(), channel.getGbDeviceId());
mediaServerService.releaseSsrc(streamInfo.getMediaServer().getId(), sendRtpItem.getSsrcToRelease());
// 回复bye
sendBye(platform, finalInviteInfo.getCallId());
}, 60 * 1000);
@ -239,6 +249,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
log.error("[命令发送失败] invite BAD_REQUEST: {}", sendException.getMessage());
}
} catch (PlayException e) {
releaseAllocatedSsrc(inviteInfo);
try {
responseAck(request, e.getCode(), e.getMsg());
} catch (SipException | InvalidArgumentException | ParseException sendException) {
@ -246,6 +257,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
} catch (Exception e) {
log.error("[Invite处理异常] ", e);
releaseAllocatedSsrc(inviteInfo);
try {
responseAck(request, Response.SERVER_INTERNAL_ERROR, "");
} catch (SipException | InvalidArgumentException | ParseException sendException) {
@ -254,6 +266,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
private void releaseAllocatedSsrc(InviteMessageInfo inviteInfo) {
if (inviteInfo == null || inviteInfo.getAllocatedSsrc() == null || inviteInfo.getAllocatedSsrcMediaServerId() == null) {
return;
}
mediaServerService.releaseSsrc(inviteInfo.getAllocatedSsrcMediaServerId(), inviteInfo.getAllocatedSsrc());
inviteInfo.setAllocatedSsrc(null);
inviteInfo.setAllocatedSsrcMediaServerId(null);
}
private InviteMessageInfo decode(RequestEvent evt) throws SdpException {
InviteMessageInfo inviteInfo = new InviteMessageInfo();
@ -478,7 +499,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
SessionDescription sdp = gb28181Sdp.getBaseSdb();
if (ObjectUtils.isEmpty(gb28181Sdp.getSsrc()) ) {
String ssrc = sendSsrcFactory.getSendSsrc("0");
String ssrc = Integer.toUnsignedString(new SecureRandom().nextInt());
log.warn("来自设备的Invite请求未携带SSRC生成随机ssrc: {}requesterId {}/{}", ssrc, inviteInfo.getRequesterId(), inviteInfo.getSourceChannelId());
gb28181Sdp.setSsrc(ssrc);
}

View File

@ -29,7 +29,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.Thread;
/**
@ -168,9 +167,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
// 目前支持设备通道上线通知时和设备上线时向上级通知
int finalSn = sn;
Thread.startVirtualThread(() -> {
ReentrantLock lock = catalogDataCatch.getDeviceWriteLock(device.getDeviceId());
lock.lock();
try {
boolean resetChannelsResult = saveData(device, finalSn);
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "";
@ -178,9 +174,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
} else {
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null);
}
} finally {
lock.unlock();
}
});
}
}

View File

@ -49,6 +49,8 @@ public interface IMediaServerService {
void removeCount(String mediaServerId);
void releaseSsrc(String mediaServerItemId, String ssrc);
void clearMediaServerForOnline();
void add(MediaServer mediaSerItem);

View File

@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.TalkRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
@ -56,6 +57,9 @@ import java.util.*;
@Service
public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private UserSetting userSetting;
@ -146,6 +150,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (ObjectUtils.isEmpty(mediaServer.getId())) {
continue;
}
// 更新
if (!ssrcFactory.hasMediaServerSSRC(mediaServer.getId())) {
ssrcFactory.initMediaServerSSRC(mediaServer.getId(), null);
}
// 查询redis是否存在此mediaServer
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId();
Boolean hasKey = redisTemplate.hasKey(key);
@ -221,11 +229,21 @@ public class MediaServerServiceImpl implements IMediaServerService {
return mediaNodeServerService.updateRtpServerSSRC(mediaServer, app, streamId, ssrc);
}
@Override
public void releaseSsrc(String mediaServerId, String ssrc) {
MediaServer mediaServer = getOne(mediaServerId);
if (mediaServer == null || ssrc == null) {
return;
}
ssrcFactory.releaseSsrc(mediaServerId, ssrc);
}
/**
* 媒体服务节点 重启后重置他的推流信息 TODO 给正在使用的设备发送停止命令
*/
@Override
public void clearRTPServer(MediaServer mediaServer) {
ssrcFactory.reset(mediaServer.getId());
}
@Override
@ -236,6 +254,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaServerMapper.add(mediaServer);
}
MediaServer mediaServerInRedis = getOne(mediaServer.getId());
if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServer.getId())) {
ssrcFactory.initMediaServerSSRC(mediaServer.getId(),null);
}
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId();
redisTemplate.opsForHash().put(key, mediaServer.getId(), mediaServer);
if (mediaServer.isStatus()) {

View File

@ -119,11 +119,11 @@ public class ZLMHttpHookListener {
ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
if (resultForOnPublish != null) {
HookResultForOnPublish successResult = HookResultForOnPublish.getInstance(resultForOnPublish);
log.info("[ZLM HOOK]推流鉴权-允许-响应:{}->{}->>>>{}", param.getMediaServerId(), param, successResult);
log.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, successResult);
return successResult;
}else {
HookResultForOnPublish fail = HookResultForOnPublish.Fail();
log.info("[ZLM HOOK]推流鉴权-拒绝-参数:{}->{}", param.getMediaServerId(), param);
log.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, fail);
return fail;
}
}

View File

@ -82,7 +82,6 @@ public class ZLMMediaServerStatusManager {
offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem);
offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis());
}
execute();
}
@Async

View File

@ -18,11 +18,7 @@ public class RTPServerParam {
private String app;
private String streamId;
/**
* 是否将ssrc传递给zlm做校验
*/
private boolean ssrcCheck;
/**
* 开启rtpServer时使用的ssrc
* 开启rtpServer时使用的ssrc开启rtpServer时会根据这个ssrc进行校验如果不填则不校验
*/
private Long ssrc;
private Integer port;

View File

@ -7,6 +7,7 @@ public class SSRCInfo {
private int port;
private String ssrc;
private String allocatedSsrc;
private String app;
private String stream;
@ -17,4 +18,8 @@ public class SSRCInfo {
this.stream = stream;
}
public String getSsrcToRelease() {
return allocatedSsrc;
}
}

View File

@ -28,6 +28,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@ -90,7 +91,11 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
if (presetSSRC != null) {
ssrc = presetSSRC;
}else {
ssrc = playback ? ssrcFactory.getPlayBackSsrc(mediaServer) : ssrcFactory.getPlaySsrc(mediaServer);
if (playback) {
ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
}else {
ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
}
}
if (streamId == null) {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
@ -101,8 +106,10 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId);
RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaStreamUtil.RTP_APP, streamId, Long.parseLong(ssrc), null, onlyAuto, disableAuto, false, tcpMode);
rtpServerParam.setSsrcCheck(ssrcCheck);
if (presetSSRC == null) {
ssrcInfo.setAllocatedSsrc(ssrc);
}
RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaStreamUtil.RTP_APP, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, false, tcpMode);
int rtpServerPort = openCommonRTPServer(rtpServerParam, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
@ -110,6 +117,11 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult);
} else {
// 释放ssrc
if (presetSSRC == null) {
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
ssrcInfo.setAllocatedSsrc(null);
}
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(code, msg, openRTPServerResult);
@ -136,20 +148,32 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
if (presetSSRC != null) {
ssrc = presetSSRC;
}else {
ssrc = ssrcFactory.getPlaySsrc(mediaServer);
ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
}
String streamId = String.format("%08x", Long.parseLong(ssrc)).toLowerCase();
String streamReplace = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
String streamId;
String streamReplace = null;
if (mediaServer.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
}else {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
streamReplace = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
}
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
if (device.isSsrcCheck() && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[开启国标点播RTP收流] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
}
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace);
openRtpServer(mediaServer, ssrcInfo, Long.parseLong(ssrc), !channel.isHasAudio(), false, tcpMode, callback, device.isSsrcCheck());
Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L;
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace != null ? streamReplace : streamId);
if (presetSSRC == null) {
ssrcInfo.setAllocatedSsrc(ssrc);
}
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
addAuthenticateInfo(streamId, streamReplace, channel.isHasAudio(), record, null);
return ssrcInfo;
}
@ -167,19 +191,29 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}
// 获取 mediaServer 可用的 ssrc
String ssrc = ssrcFactory.getPlayBackSsrc(mediaServer);
String ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
String streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
String streamReplace = getPlaybackStream(device, channel, startTime, endTime);
String streamId;
String streamReplace = null;
if (mediaServer.isRtpEnable()) {
streamId = getPlaybackStream(device, channel, startTime, endTime);
}else {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
streamReplace = getPlaybackStream(device, channel, startTime, endTime);
}
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
if (device.isSsrcCheck() && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[开启国标回放RTP收流] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
}
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace);
openRtpServer(mediaServer, ssrcInfo, Long.parseLong(ssrc), !channel.isHasAudio(), false, tcpMode, callback, device.isSsrcCheck());
Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L;
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace != null ? streamReplace : streamId);
ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
addAuthenticateInfo(streamId, streamReplace, channel.isHasAudio(), false,null);
return ssrcInfo;
}
@ -211,23 +245,22 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 获取 mediaServer 可用的 ssrc
String ssrc = ssrcFactory.getPlayBackSsrc(mediaServer);
String ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
String streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
String streamReplace = String.format("%s_%s_%s_%s", device.getDeviceId(), channel.getDeviceId(),
startTime.replace("-", "").replace(":", "").replace(" ", ""),
endTime.replace("-", "").replace(":", "").replace(" ", ""));
if (device.isSsrcCheck() && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[开启国标录像下载RTP收流] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
}
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace);
openRtpServer(mediaServer, ssrcInfo, Long.parseLong(ssrc), !channel.isHasAudio(), false, tcpMode, callback, device.isSsrcCheck());
Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L;
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId);
ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
long difference = DateUtil.getDifference(startTime, endTime) / 1000;
addAuthenticateInfo(streamId, streamReplace, channel.isHasAudio(), true, (int) difference);
addAuthenticateInfo(streamId, null, channel.isHasAudio(), true, (int) difference);
return ssrcInfo;
}
@ -258,23 +291,18 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}
// 获取 mediaServer 可用的 ssrc
String ssrc = ssrcFactory.getPlaySsrc(mediaServer);
String ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId);
openRtpServer(mediaServer, ssrcInfo, Long.parseLong(ssrc), false, true, tcpMode, callback, false);
ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, 0L, false, true, tcpMode, callback);
return ssrcInfo;
}
private void openRtpServer(MediaServer mediaServer, SSRCInfo ssrcInfo, Long checkSsrc, boolean disableAuto, boolean onlyAuto, int tcpMode,
ErrorCallback<OpenRTPServerResult> callback) {
openRtpServer(mediaServer, ssrcInfo, checkSsrc, disableAuto, onlyAuto, tcpMode, callback, false);
}
private void openRtpServer(MediaServer mediaServer, SSRCInfo ssrcInfo, Long checkSsrc, boolean disableAuto, boolean onlyAuto, int tcpMode,
ErrorCallback<OpenRTPServerResult> callback, boolean ssrcCheck) {
RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaStreamUtil.RTP_APP, ssrcInfo.getStream(), checkSsrc, null, onlyAuto, disableAuto, false, tcpMode);
rtpServerParam.setSsrcCheck(ssrcCheck);
int rtpServerPort = openCommonRTPServer(rtpServerParam, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
@ -282,6 +310,11 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult);
} else {
// 释放ssrc
if (ssrcInfo.getAllocatedSsrc() != null) {
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrcInfo.getAllocatedSsrc());
ssrcInfo.setAllocatedSsrc(null);
}
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(code, msg, openRTPServerResult);
@ -308,9 +341,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 收流超时
// 关闭收流端口
String closeStreamId = rtpServerParam.getMediaServer().isRtpEnable()
? String.format("%08x", rtpServerParam.getSsrc()) : rtpServerParam.getStreamId();
mediaServerService.closeRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), closeStreamId);
mediaServerService.closeRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), rtpServerParam.getStreamId());
subscribe.removeSubscribe(rtpHook);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
}, userSetting.getPlayTimeout());
@ -324,9 +355,8 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
int rtpServerPort;
if (rtpServerParam.getMediaServer().isRtpEnable()) {
String zlmStreamId = String.format("%08x", rtpServerParam.getSsrc());
Long checkSsrc = rtpServerParam.isSsrcCheck() ? rtpServerParam.getSsrc() : 0L;
rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), zlmStreamId, checkSsrc, rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(),
rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), rtpServerParam.getStreamId(),
Objects.requireNonNullElse(rtpServerParam.getSsrc(), 0L), rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(),
rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode());
} else {
rtpServerPort = rtpServerParam.getMediaServer().getRtpProxyPort();

View File

@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
@ -25,7 +25,7 @@ import org.springframework.stereotype.Component;
public class RedisRpcSendRtpController extends RpcController {
@Autowired
private SendSsrcFactory sendSsrcFactory;
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@ -71,8 +71,10 @@ public class RedisRpcSendRtpController extends RpcController {
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
if (sendRtpItem.getSsrc() == null) {
sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1"));
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setAllocatedSsrc(ssrc);
}
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
@ -171,7 +173,9 @@ public class RedisRpcSendRtpController extends RpcController {
return;
}
sendRtpServerService.delete(sendRtpItem);
if (sendRtpItem.getMediaServerId() != null) {
mediaServerService.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease());
}
}
}

View File

@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
@ -31,7 +31,7 @@ import org.springframework.stereotype.Component;
public class RedisRpcStreamPushController extends RpcController {
@Autowired
private SendSsrcFactory sendSsrcFactory;
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@ -73,8 +73,10 @@ public class RedisRpcStreamPushController extends RpcController {
log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1"));
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServer.getId()) : ssrcFactory.getPlayBackSsrc(mediaServer.getId());
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setAllocatedSsrc(ssrc);
}
sendRtpItem.setMediaServerId(mediaServer.getId());
sendRtpItem.setLocalIp(mediaServer.getSdpIp());
@ -91,8 +93,10 @@ public class RedisRpcStreamPushController extends RpcController {
log.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1"));
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId());
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setAllocatedSsrc(ssrc);
}
sendRtpItem.setMediaServerId(hookData.getMediaServer().getId());
sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());

View File

@ -12,7 +12,7 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelListForRpcParam;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
@ -44,7 +44,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
private HookSubscribe hookSubscribe;
@Autowired
private SendSsrcFactory sendSsrcFactory;
private SSRCFactory ssrcFactory;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@ -107,8 +107,10 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1"));
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId());
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setAllocatedSsrc(ssrc);
}
sendRtpItem.setMediaServerId(hookData.getMediaServer().getId());
sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());

View File

@ -99,8 +99,6 @@ media:
user-settings:
# 点播/录像回放 等待超时时间,单位:毫秒
play-timeout: 180000
# [可选] 多端口模式使用随机SSRCSSRC允许重复默认false
ssrc-random: false
# [可选] 自动点播, 使用固定流地址进行播放时,如果未点播则自动进行点播, 需要rtp.enable=true
auto-apply-play: true
# 推流直播是否录制

View File

@ -1,293 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.web.custom.bean.CameraGroup;
import com.genersoft.iot.vmp.web.custom.bean.Point;
import org.junit.jupiter.api.Test;
import java.util.*;
import static org.junit.jupiter.api.Assertions.*;
class ChannelProviderTest {
private final ChannelProvider provider = new ChannelProvider();
// ========== queryByGbDeviceIds ==========
@Test
void queryByGbDeviceIds_shouldUseBindVariables() {
Map<String, Object> params = new HashMap<>();
params.put("deviceIds", Arrays.asList("DEV001", "DEV002"));
String sql = provider.queryByGbDeviceIds(params);
assertTrue(sql.contains("#{deviceIds[0]}"), "should use #{deviceIds[0]}");
assertTrue(sql.contains("#{deviceIds[1]}"), "should use #{deviceIds[1]}");
assertFalse(sql.contains("'DEV001'"), "should not contain raw quoted value");
assertFalse(sql.contains("'DEV002'"), "should not contain raw quoted value");
}
@Test
void queryByGbDeviceIds_shouldNotQuoteBindVariables() {
Map<String, Object> params = new HashMap<>();
params.put("deviceIds", Collections.singletonList("INJECT' OR 1=1 --"));
String sql = provider.queryByGbDeviceIds(params);
assertTrue(sql.contains("#{deviceIds[0]}"), "should use bind variable for injection attempt");
assertFalse(sql.contains("1=1"), "should not contain injection payload in SQL");
}
// ========== queryByGroupList ==========
@Test
void queryByGroupList_shouldUseBindVariables() {
Map<String, Object> params = new HashMap<>();
Group g1 = new Group();
g1.setDeviceId("GRP001");
Group g2 = new Group();
g2.setDeviceId("GRP002");
params.put("groupList", Arrays.asList(g1, g2));
String sql = provider.queryByGroupList(params);
assertTrue(sql.contains("#{groupList[0].deviceId}"), "should use #{groupList[0].deviceId}");
assertTrue(sql.contains("#{groupList[1].deviceId}"), "should use #{groupList[1].deviceId}");
assertFalse(sql.contains("GRP001"), "should not contain raw deviceId");
assertFalse(sql.contains("GRP002"), "should not contain raw deviceId");
}
// ========== queryOnlineListsByGbDeviceIds ==========
@Test
void queryOnlineListsByGbDeviceIds_shouldUseBindVariables() {
Map<String, Object> params = new HashMap<>();
Device d1 = new Device();
d1.setId(101);
Device d2 = new Device();
d2.setId(102);
params.put("deviceList", Arrays.asList(d1, d2));
String sql = provider.queryOnlineListsByGbDeviceIds(params);
assertTrue(sql.contains("#{deviceList[0].id}"), "should use #{deviceList[0].id}");
assertTrue(sql.contains("#{deviceList[1].id}"), "should use #{deviceList[1].id}");
assertFalse(sql.contains("101"), "should not contain raw id");
assertFalse(sql.contains("102"), "should not contain raw id");
}
@Test
void queryOnlineListsByGbDeviceIds_withEmptyList_shouldNotHaveInClause() {
Map<String, Object> params = new HashMap<>();
params.put("deviceList", Collections.emptyList());
String sql = provider.queryOnlineListsByGbDeviceIds(params);
assertFalse(sql.contains("data_device_id in ("), "should not have IN clause when empty");
}
@Test
void queryOnlineListsByGbDeviceIds_withNullList_shouldNotHaveInClause() {
Map<String, Object> params = new HashMap<>();
params.put("deviceList", null);
String sql = provider.queryOnlineListsByGbDeviceIds(params);
assertFalse(sql.contains("data_device_id in ("), "should not have IN clause when null");
}
// ========== queryListWithChildForSy ==========
@Test
void queryListWithChildForSy_shouldUseBindVariables() {
Map<String, Object> params = new HashMap<>();
CameraGroup cg1 = new CameraGroup();
cg1.setDeviceId("CG001");
CameraGroup cg2 = new CameraGroup();
cg2.setDeviceId("CG002");
params.put("groupList", Arrays.asList(cg1, cg2));
String sql = provider.queryListWithChildForSy(params);
assertTrue(sql.contains("#{groupList[0].deviceId}"), "should use #{groupList[0].deviceId}");
assertTrue(sql.contains("#{groupList[1].deviceId}"), "should use #{groupList[1].deviceId}");
assertFalse(sql.contains("'CG001'"), "should not contain raw quoted value");
}
@Test
void queryListWithChildForSy_withQuery_shouldUseBindVariable() {
Map<String, Object> params = new HashMap<>();
params.put("query", "search-term");
params.put("groupList", Collections.singletonList(new CameraGroup()));
String sql = provider.queryListWithChildForSy(params);
assertTrue(sql.contains("#{query}"), "should use #{query} bind variable");
assertFalse(sql.contains("search-term"), "should not contain raw query");
}
@Test
void queryListWithChildForSy_withSort_shouldUseWhitelist() {
Map<String, Object> params = new HashMap<>();
params.put("groupList", Collections.singletonList(new CameraGroup()));
params.put("sortName", "gbId");
params.put("order", true);
String sql = provider.queryListWithChildForSy(params);
assertTrue(sql.contains("order by gb_id"), "should sort by gb_id");
assertTrue(sql.contains("ASC"), "should be ascending");
}
@Test
void queryListWithChildForSy_withSortDesc_shouldUseDesc() {
Map<String, Object> params = new HashMap<>();
params.put("groupList", Collections.singletonList(new CameraGroup()));
params.put("sortName", "gbId");
params.put("order", false);
String sql = provider.queryListWithChildForSy(params);
assertTrue(sql.contains("DESC"), "should be descending");
}
// ========== queryListInBox ==========
@Test
void queryListInBox_shouldUseBindVariables() {
Map<String, Object> params = new HashMap<>();
CameraGroup cg = new CameraGroup();
cg.setDeviceId("BOX001");
params.put("groupList", Collections.singletonList(cg));
params.put("level", 3);
String sql = provider.queryListInBox(params);
assertTrue(sql.contains("#{groupList[0].deviceId}"), "should use bind variable");
assertFalse(sql.contains("'BOX001'"), "should not contain raw value");
assertTrue(sql.contains("#{level}"), "should use #{level} bind variable");
assertTrue(sql.contains("#{minLongitude}"), "should use #{minLongitude}");
assertTrue(sql.contains("#{maxLatitude}"), "should use #{maxLatitude}");
}
// ========== queryListInCircleForMysql ==========
@Test
void queryListInCircleForMysql_shouldUseBindVariablesForGeometry() {
Map<String, Object> params = new HashMap<>();
CameraGroup cg = new CameraGroup();
cg.setDeviceId("CIRCLE001");
params.put("groupList", Collections.singletonList(cg));
params.put("centerLongitude", 116.397);
params.put("centerLatitude", 39.908);
params.put("radius", 1000);
String sql = provider.queryListInCircleForMysql(params);
assertTrue(sql.contains("#{centerLongitude}"), "should use #{centerLongitude} bind variable");
assertTrue(sql.contains("#{centerLatitude}"), "should use #{centerLatitude} bind variable");
assertTrue(sql.contains("#{radius}"), "should use #{radius} bind variable");
assertFalse(sql.contains("116.397"), "should not contain raw longitude");
assertFalse(sql.contains("39.908"), "should not contain raw latitude");
assertTrue(sql.contains("CONCAT('point(', #{centerLongitude}, ' ', #{centerLatitude}, ')')"),
"should build WKT via CONCAT with bind variables");
}
// ========== queryListInCircleForKingBase ==========
@Test
void queryListInCircleForKingBase_shouldUseBindVariablesForGeometry() {
Map<String, Object> params = new HashMap<>();
CameraGroup cg = new CameraGroup();
cg.setDeviceId("CIRCLE002");
params.put("groupList", Collections.singletonList(cg));
params.put("centerLongitude", 121.473);
params.put("centerLatitude", 31.230);
params.put("radius", 500);
String sql = provider.queryListInCircleForKingBase(params);
assertTrue(sql.contains("#{centerLongitude}"), "should use #{centerLongitude}");
assertTrue(sql.contains("#{centerLatitude}"), "should use #{centerLatitude}");
assertTrue(sql.contains("#{radius}"), "should use #{radius}");
assertFalse(sql.contains("121.473"), "should not contain raw longitude");
assertFalse(sql.contains("31.230"), "should not contain raw latitude");
assertTrue(sql.contains("CONCAT('point(', #{centerLongitude}, ' ', #{centerLatitude}, ')')"),
"should build WKT via CONCAT with bind variables");
}
// ========== queryListInPolygonForMysql ==========
@Test
void queryListInPolygonForMysql_shouldUseBindVariablesForPoints() {
Map<String, Object> params = new HashMap<>();
CameraGroup cg = new CameraGroup();
cg.setDeviceId("POLY001");
params.put("groupList", Collections.singletonList(cg));
List<Point> points = new ArrayList<>();
Point p1 = new Point();
p1.setLng(116.0);
p1.setLat(39.0);
Point p2 = new Point();
p2.setLng(117.0);
p2.setLat(40.0);
points.add(p1);
points.add(p2);
params.put("pointList", points);
String sql = provider.queryListInPolygonForMysql(params);
assertTrue(sql.contains("#{pointList[0].lng}"), "should use #{pointList[0].lng}");
assertTrue(sql.contains("#{pointList[0].lat}"), "should use #{pointList[0].lat}");
assertTrue(sql.contains("#{pointList[1].lng}"), "should use #{pointList[1].lng}");
assertTrue(sql.contains("#{pointList[1].lat}"), "should use #{pointList[1].lat}");
assertFalse(sql.contains("116.0"), "should not contain raw lng");
assertFalse(sql.contains("117.0"), "should not contain raw lat");
assertTrue(sql.contains("CONCAT('POLYGON(('"), "should use CONCAT to build polygon WKT");
}
// ========== queryListInPolygonForKingBase ==========
@Test
void queryListInPolygonForKingBase_shouldUseBindVariablesForPoints() {
Map<String, Object> params = new HashMap<>();
CameraGroup cg = new CameraGroup();
cg.setDeviceId("POLY002");
params.put("groupList", Collections.singletonList(cg));
List<Point> points = new ArrayList<>();
Point p1 = new Point();
p1.setLng(116.0);
p1.setLat(39.0);
points.add(p1);
params.put("pointList", points);
String sql = provider.queryListInPolygonForKingBase(params);
assertTrue(sql.contains("#{pointList[0].lng}"), "should use #{pointList[0].lng}");
assertTrue(sql.contains("#{pointList[0].lat}"), "should use #{pointList[0].lat}");
assertFalse(sql.contains("116.0"), "should not contain raw lng");
assertFalse(sql.contains("39.0"), "should not contain raw lat");
assertTrue(sql.contains("ST_MakePoint"), "should use KingBase specific function");
}
// ========== queryListInCircleForMysql with injection attempt ==========
@Test
void queryListInCircleForMysql_shouldNotContainInjectionPayload() {
Map<String, Object> params = new HashMap<>();
CameraGroup cg = new CameraGroup();
cg.setDeviceId("NORMAL");
params.put("groupList", Collections.singletonList(cg));
params.put("centerLongitude", "0) OR 1=1 -- ");
params.put("centerLatitude", "0");
params.put("radius", 1000);
String sql = provider.queryListInCircleForMysql(params);
assertTrue(sql.contains("#{centerLongitude}"), "should use bind variable for injection payload");
assertFalse(sql.contains("1=1"), "should not contain 1=1 in SQL text");
assertFalse(sql.contains("OR 1=1"), "should not contain injection");
}
// ========== queryByGbDeviceIds single element ==========
@Test
void queryByGbDeviceIds_withSingleElement() {
Map<String, Object> params = new HashMap<>();
params.put("deviceIds", Collections.singletonList("SINGLE01"));
String sql = provider.queryByGbDeviceIds(params);
assertEquals(1, countOccurrences(sql, "#{deviceIds[0]}"),
"should have exactly one bind variable for single element");
assertFalse(sql.contains("#{deviceIds[0]},"), "should not have trailing comma in IN clause");
assertFalse(sql.contains(",#{deviceIds[0]}"), "should not have leading comma in IN clause");
}
// ========== helper ==========
private int countOccurrences(String str, String substr) {
int count = 0;
int idx = 0;
while ((idx = str.indexOf(substr, idx)) != -1) {
count++;
idx += substr.length();
}
return count;
}
}

View File

@ -1,177 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import org.junit.jupiter.api.Test;
import java.util.*;
import static org.junit.jupiter.api.Assertions.*;
class DeviceChannelProviderTest {
private final DeviceChannelProvider provider = new DeviceChannelProvider();
@Test
void queryChannels_withChannelIds_shouldUseBindVariables() {
Map<String, Object> params = new HashMap<>();
params.put("channelIds", Arrays.asList("CH001", "CH002", "CH003"));
String sql = provider.queryChannels(params);
assertTrue(sql.contains("#{channelIds[0]}"), "should use #{channelIds[0]}");
assertTrue(sql.contains("#{channelIds[1]}"), "should use #{channelIds[1]}");
assertTrue(sql.contains("#{channelIds[2]}"), "should use #{channelIds[2]}");
assertFalse(sql.contains("CH001"), "should not contain raw channel id");
assertFalse(sql.contains("CH002"), "should not contain raw channel id");
assertTrue(sql.contains("dc.device_id in ("), "should have IN clause");
}
@Test
void queryChannels_withoutChannelIds_shouldNotContainInClause() {
Map<String, Object> params = new HashMap<>();
String sql = provider.queryChannels(params);
assertFalse(sql.contains("device_id in ("), "should not have IN clause when no channelIds");
assertTrue(sql.contains("ORDER BY"), "should have ORDER BY");
}
@Test
void queryChannels_withEmptyChannelIds_shouldNotContainInClause() {
Map<String, Object> params = new HashMap<>();
params.put("channelIds", Collections.emptyList());
String sql = provider.queryChannels(params);
assertFalse(sql.contains("device_id in ("), "should not have IN clause when channelIds empty");
}
@Test
void queryChannels_withDataDeviceId_shouldUseBindVariable() {
Map<String, Object> params = new HashMap<>();
params.put("dataDeviceId", 42);
String sql = provider.queryChannels(params);
assertTrue(sql.contains("#{dataDeviceId}"), "should use #{dataDeviceId}");
}
@Test
void queryChannels_withQuery_shouldUseBindVariable() {
Map<String, Object> params = new HashMap<>();
params.put("query", "test");
String sql = provider.queryChannels(params);
assertTrue(sql.contains("#{query}"), "should use #{query} bind variable");
assertFalse(sql.contains("'test'"), "should not contain raw query value");
}
@Test
void queryChannels_withOnline_shouldFilterStatus() {
Map<String, Object> params = new HashMap<>();
params.put("online", true);
String sql = provider.queryChannels(params);
assertTrue(sql.contains("'ON'"), "should filter for ON status");
assertFalse(sql.contains("'OFF'"), "should not filter for OFF status");
}
@Test
void queryChannels_withOffline_shouldFilterStatus() {
Map<String, Object> params = new HashMap<>();
params.put("online", false);
String sql = provider.queryChannels(params);
assertTrue(sql.contains("'OFF'"), "should filter for OFF status");
assertFalse(sql.contains("'ON'"), "should not filter for ON status");
}
@Test
void queryChannels_withBusinessGroupId_shouldFilter() {
Map<String, Object> params = new HashMap<>();
params.put("businessGroupId", "group-1");
String sql = provider.queryChannels(params);
assertTrue(sql.contains("#{businessGroupId}"), "should use bind variable");
}
@Test
void queryChannelsByDeviceDbId_shouldUseBindVariable() {
Map<String, Object> params = new HashMap<>();
params.put("dataDeviceId", 99);
String sql = provider.queryChannelsByDeviceDbId(params);
assertTrue(sql.contains("#{dataDeviceId}"), "should use #{dataDeviceId}");
}
@Test
void queryChannelsByDeviceDbId_shouldFilterByDataType() {
Map<String, Object> params = new HashMap<>();
params.put("dataDeviceId", 1);
String sql = provider.queryChannelsByDeviceDbId(params);
assertTrue(sql.contains("data_type = 1"), "should filter by GB28181 data type");
}
@Test
void getOne_shouldUseBindVariable() {
Map<String, Object> params = new HashMap<>();
params.put("id", 123);
String sql = provider.getOne(params);
assertTrue(sql.contains("#{id}"), "should use #{id} bind variable");
assertTrue(sql.contains("where"), "should have WHERE clause");
assertTrue(sql.contains("#{id}"), "should have bind variable");
}
@Test
void getOneByDeviceId_shouldUseBindVariables() {
Map<String, Object> params = new HashMap<>();
params.put("dataDeviceId", 10);
params.put("channelId", "CH999");
String sql = provider.getOneByDeviceId(params);
assertTrue(sql.contains("#{dataDeviceId}"), "should use #{dataDeviceId}");
assertTrue(sql.contains("#{channelId}"), "should use #{channelId}");
}
@Test
void queryByDeviceId_shouldUseBindVariable() {
Map<String, Object> params = new HashMap<>();
params.put("gbDeviceId", "GB-TEST-123");
String sql = provider.queryByDeviceId(params);
assertTrue(sql.contains("#{gbDeviceId}"), "should use #{gbDeviceId}");
}
@Test
void queryById_shouldUseBindVariable() {
Map<String, Object> params = new HashMap<>();
params.put("gbId", 456);
String sql = provider.queryById(params);
assertTrue(sql.contains("#{gbId}"), "should use #{gbId}");
}
@Test
void queryList_withQuery_shouldUseBindVariable() {
Map<String, Object> params = new HashMap<>();
params.put("query", "search-term");
String sql = provider.queryList(params);
assertTrue(sql.contains("#{query}"), "should use #{query} bind variable");
assertFalse(sql.contains("search-term"), "should not contain raw query value");
}
@Test
void queryList_withOnline_shouldFilter() {
Map<String, Object> params = new HashMap<>();
params.put("online", true);
String sql = provider.queryList(params);
assertTrue(sql.contains("'ON'"), "should filter for ON");
}
@Test
void queryList_withHasCivilCode_shouldFilter() {
Map<String, Object> params = new HashMap<>();
params.put("hasCivilCode", true);
String sql = provider.queryList(params);
assertTrue(sql.contains("civil_code) is not null"), "should filter for not null civil code");
}
@Test
void queryList_withHasGroup_shouldFilter() {
Map<String, Object> params = new HashMap<>();
params.put("hasGroup", true);
String sql = provider.queryList(params);
assertTrue(sql.contains("parent_id) is not null"), "should filter for not null parent");
}
@Test
void queryChannels_withHasStream_shouldFilter() {
Map<String, Object> params = new HashMap<>();
params.put("hasStream", true);
String sql = provider.queryChannels(params);
assertTrue(sql.contains("stream_id IS NOT NULL"), "should filter for not null stream_id");
}
}

View File

@ -1,176 +0,0 @@
package com.genersoft.iot.vmp.gb28181.session;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.*;
class SSRCFactoryTest {
private SSRCFactory ssrcFactory;
private static final String DOMAIN_PART = "20000";
private static final String SERVER_ID = "test-server";
@BeforeEach
void setUp() throws Exception {
ssrcFactory = new SSRCFactory();
ReflectionTestUtils.setField(ssrcFactory, "domainPart", DOMAIN_PART);
Field schedulerField = SSRCFactory.class.getDeclaredField("scheduler");
schedulerField.setAccessible(true);
java.util.concurrent.ScheduledExecutorService scheduler =
(java.util.concurrent.ScheduledExecutorService) schedulerField.get(ssrcFactory);
scheduler.shutdownNow();
}
@Test
void getPlaySsrc_shouldReturnCorrectFormat() {
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc);
assertEquals(10, ssrc.length(), "SSRC should be 10 characters: prefix(1) + domain(5) + seq(4)");
assertTrue(ssrc.startsWith("0"), "Play SSRC should start with '0'");
assertTrue(ssrc.substring(1).startsWith(DOMAIN_PART), "SSRC should contain domain part");
assertTrue(ssrc.matches("0" + DOMAIN_PART + "\\d{4}"), "SSRC format: 0" + DOMAIN_PART + "NNNN");
}
@Test
void getPlayBackSsrc_shouldReturnCorrectFormat() {
String ssrc = ssrcFactory.getPlayBackSsrc(SERVER_ID);
assertNotNull(ssrc);
assertEquals(10, ssrc.length(), "SSRC should be 10 characters: prefix(1) + domain(5) + seq(4)");
assertTrue(ssrc.startsWith("1"), "PlayBack SSRC should start with '1'");
assertTrue(ssrc.substring(1).startsWith(DOMAIN_PART), "SSRC should contain domain part");
assertTrue(ssrc.matches("1" + DOMAIN_PART + "\\d{4}"), "SSRC format: 1" + DOMAIN_PART + "NNNN");
}
@Test
void allocations_withinSameServer_shouldBeUnique() {
Set<String> allocated = new HashSet<>();
for (int i = 0; i < 1000; i++) {
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc, "Should allocate SSRC #" + i);
assertTrue(allocated.add(ssrc), "SSRC should be unique: " + ssrc);
}
assertEquals(1000, allocated.size());
}
@Test
void allocations_forDifferentServers_shouldBeIndependent() {
String serverA = "server-a";
String serverB = "server-b";
for (int i = 0; i < 10000; i++) {
assertNotNull(ssrcFactory.getPlaySsrc(serverA), "Server A should allocate SSRC #" + i);
}
assertNull(ssrcFactory.getPlaySsrc(serverA), "Server A should be exhausted");
for (int i = 0; i < 1000; i++) {
assertNotNull(ssrcFactory.getPlaySsrc(serverB), "Server B should allocate SSRC #" + i);
}
}
@Test
void exhaustion_shouldReturnNull() {
for (int i = 0; i < 10000; i++) {
assertNotNull(ssrcFactory.getPlaySsrc(SERVER_ID), "iteration " + i);
}
assertNull(ssrcFactory.getPlaySsrc(SERVER_ID), "Should return null when exhausted");
assertNull(ssrcFactory.getPlayBackSsrc(SERVER_ID), "Should return null for PlayBack too");
}
@Test
@Disabled("Needs mocked mediaServerService for ZLM query")
void rebuild_shouldResetUsage() {
for (int i = 0; i < 500; i++) {
ssrcFactory.getPlaySsrc(SERVER_ID);
}
ssrcFactory.rebuild();
for (int i = 0; i < 500; i++) {
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc, "After rebuild should allocate SSRC #" + i);
}
}
@Test
void allocateAll_shouldUseAll10000Slots() {
Set<String> allocated = new HashSet<>();
for (int i = 0; i < 10000; i++) {
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc, "Should allocate at iteration " + i);
allocated.add(ssrc);
}
assertEquals(10000, allocated.size(), "All 10000 slots should be unique");
}
@Test
void twoPrefixes_shareSamePool() throws Exception {
for (int i = 0; i < 5000; i++) {
assertNotNull(ssrcFactory.getPlaySsrc(SERVER_ID), "play #" + i);
assertNotNull(ssrcFactory.getPlayBackSsrc(SERVER_ID), "playback #" + i);
}
Field usedMapField = SSRCFactory.class.getDeclaredField("usedMap");
usedMapField.setAccessible(true);
java.util.concurrent.ConcurrentHashMap<String, java.util.BitSet> usedMap =
(java.util.concurrent.ConcurrentHashMap<String, java.util.BitSet>) usedMapField.get(ssrcFactory);
java.util.BitSet bits = usedMap.get(SERVER_ID);
assertNotNull(bits);
assertEquals(10000, bits.cardinality(), "All 10000 bits should be set");
}
@Test
void multipleServers_shouldNotAffectEachOther() {
String server1 = "server-1";
String server2 = "server-2";
String server3 = "server-3";
for (int i = 0; i < 10000; i++) {
ssrcFactory.getPlaySsrc(server1);
}
assertNull(ssrcFactory.getPlaySsrc(server1));
assertNotNull(ssrcFactory.getPlaySsrc(server2));
assertNotNull(ssrcFactory.getPlaySsrc(server3));
for (int i = 0; i < 100; i++) {
ssrcFactory.getPlaySsrc(server2);
ssrcFactory.getPlaySsrc(server3);
}
assertNull(ssrcFactory.getPlaySsrc(server1));
}
@Test
void linearProbe_skipsUsedSlots() throws Exception {
Field usedMapField = SSRCFactory.class.getDeclaredField("usedMap");
usedMapField.setAccessible(true);
java.util.concurrent.ConcurrentHashMap<String, java.util.BitSet> usedMap =
(java.util.concurrent.ConcurrentHashMap<String, java.util.BitSet>) usedMapField.get(ssrcFactory);
java.util.BitSet bits = new java.util.BitSet(10000);
for (int i = 0; i < 100; i++) {
bits.set(i);
}
usedMap.put(SERVER_ID, bits);
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc, "Should find a free slot via linear probe");
int suffix = Integer.parseInt(ssrc.substring(6));
assertTrue(suffix >= 100, "Should skip used slots 0-99, got suffix " + suffix);
}
@Test
void ssrc_shouldBeDifferentEachCall() {
Set<String> results = new HashSet<>();
for (int i = 0; i < 100; i++) {
results.add(ssrcFactory.getPlaySsrc(SERVER_ID));
}
assertEquals(100, results.size(), "All 100 calls should return different SSRCs");
}
}

View File

@ -29,7 +29,6 @@
:error="videoError"
:message="videoError"
:has-audio="hasAudio"
:show-button="true"
fluent
autoplay
live

View File

@ -155,8 +155,7 @@ create table IF NOT EXISTS wvp_device_channel
gps_speed double precision,
gps_altitude double precision,
gps_direction double precision,
constraint uk_wvp_unique_channel unique (gb_device_id),
constraint uk_device_channel_source unique (data_device_id, device_id)
constraint uk_wvp_unique_channel unique (gb_device_id)
);
create table IF NOT EXISTS wvp_media_server

View File

@ -162,8 +162,7 @@ create table IF NOT EXISTS wvp_device_channel
enable_broadcast integer default 0 COMMENT '是否支持广播',
index (data_type),
index (data_device_id),
constraint uk_wvp_unique_channel unique (gb_device_id),
constraint uk_device_channel_source unique (data_device_id, device_id)
constraint uk_wvp_unique_channel unique (gb_device_id)
);
-- 媒体服务器(如 ZLM节点信息

View File

@ -215,8 +215,7 @@ create table IF NOT EXISTS wvp_device_channel
gps_altitude double precision,
gps_direction double precision,
enable_broadcast integer default 0,
constraint uk_wvp_unique_channel unique (gb_device_id),
constraint uk_device_channel_source unique (data_device_id, device_id)
constraint uk_wvp_unique_channel unique (gb_device_id)
);
COMMENT ON TABLE wvp_device_channel IS '保存设备下的通道信息以及扩展属性';
COMMENT ON COLUMN wvp_device_channel.id IS '主键ID';

View File

@ -196,29 +196,7 @@ call wvp_20260417();
DROP PROCEDURE wvp_20260417;
DELIMITER ;
/*
* 20260521 wvp_device_channel唯一约束
*/
DELIMITER //
CREATE PROCEDURE `wvp_20260521`()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = (SELECT DATABASE())
AND TABLE_NAME = 'wvp_device_channel'
AND INDEX_NAME = 'uk_device_channel_source')
THEN
-- 先清理可能的重复数据
DELETE t1 FROM wvp_device_channel t1
INNER JOIN wvp_device_channel t2
WHERE t1.id < t2.id
AND t1.data_device_id = t2.data_device_id
AND t1.device_id = t2.device_id;
ALTER TABLE wvp_device_channel ADD UNIQUE INDEX uk_device_channel_source (data_device_id, device_id);
END IF;
END; //
call wvp_20260521();
DROP PROCEDURE wvp_20260521;
DELIMITER ;

View File

@ -68,26 +68,6 @@ COMMENT ON COLUMN wvp_alarm.alarm_type IS '报警类别';
COMMENT ON COLUMN wvp_alarm.alarm_time IS '报警时间';
/*
* 20260521 wvp_device_channel唯一约束
*/
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes
WHERE tablename = 'wvp_device_channel'
AND indexname = 'uk_device_channel_source') THEN
-- 先清理可能的重复数据
DELETE FROM wvp_device_channel t1
USING wvp_device_channel t2
WHERE t1.id < t2.id
AND t1.data_device_id = t2.data_device_id
AND t1.device_id = t2.device_id;
ALTER TABLE wvp_device_channel ADD CONSTRAINT uk_device_channel_source UNIQUE (data_device_id, device_id);
END IF;
END;
$$;
/*
* 20260417 wvp_device_mobile_position从专属国标的位置记录表
*/