合并主线的发流端口管理逻辑

pull/952/head
648540858 2023-07-10 14:30:59 +08:00
parent 885842249f
commit 04e7f48fde
16 changed files with 234 additions and 215 deletions

View File

@ -0,0 +1,2 @@
alter table media_server
add sendRtpPortRange varchar(50) not null;

View File

@ -75,6 +75,9 @@ public class MediaConfig{
@Value("${media.rtp.port-range}") @Value("${media.rtp.port-range}")
private String rtpPortRange; private String rtpPortRange;
@Value("${media.rtp.send-port-range}")
private String rtpSendPortRange;
@Value("${media.record-assist-port:0}") @Value("${media.record-assist-port:0}")
private Integer recordAssistPort = 0; private Integer recordAssistPort = 0;
@ -206,6 +209,7 @@ public class MediaConfig{
mediaServerItem.setSecret(secret); mediaServerItem.setSecret(secret);
mediaServerItem.setRtpEnable(rtpEnable); mediaServerItem.setRtpEnable(rtpEnable);
mediaServerItem.setRtpPortRange(rtpPortRange); mediaServerItem.setRtpPortRange(rtpPortRange);
mediaServerItem.setSendRtpPortRange(rtpSendPortRange);
mediaServerItem.setRecordAssistPort(recordAssistPort); mediaServerItem.setRecordAssistPort(recordAssistPort);
mediaServerItem.setHookAliveInterval(30.00f); mediaServerItem.setHookAliveInterval(30.00f);
@ -222,4 +226,11 @@ public class MediaConfig{
return false; return false;
} }
public String getRtpSendPortRange() {
return rtpSendPortRange;
}
public void setRtpSendPortRange(String rtpSendPortRange) {
this.rtpSendPortRange = rtpSendPortRange;
}
} }

View File

@ -140,15 +140,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
}); });
}else { }else {
// 如果是非严格模式,需要关闭端口占用 JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
JSONObject startSendRtpStreamResult = null;
if (sendRtpItem.getLocalPort() != 0) {
if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
}else {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
if (startSendRtpStreamResult != null) { if (startSendRtpStreamResult != null) {
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
} }

View File

@ -349,9 +349,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (tcpActive != null) { if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive); sendRtpItem.setTcpActive(tcpActive);
@ -553,9 +551,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (streamReady) { if (streamReady) {
// 自平台内容 // 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("服务器端口资源不足"); logger.warn("服务器端口资源不足");
@ -594,9 +590,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (streamReady) { if (streamReady) {
// 自平台内容 // 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("服务器端口资源不足"); logger.warn("服务器端口资源不足");
@ -713,9 +707,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
dynamicTask.stop(callIdHeader.getCallId()); dynamicTask.stop(callIdHeader.getCallId());
if (serverId.equals(userSetting.getServerId())) { if (serverId.equals(userSetting.getServerId())) {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("上级点时创建sendRTPItem失败可能是服务器端口资源不足"); logger.warn("上级点时创建sendRTPItem失败可能是服务器端口资源不足");

View File

@ -0,0 +1,55 @@
package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.dto.MediaSendRtpPortInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class SendRtpPortManager {
private final static Logger logger = LoggerFactory.getLogger(SendRtpPortManager.class);
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private final String KEY = "VM_MEDIA_SEND_RTP_PORT_RANGE_";
public void initServerPort(String mediaServerId, int startPort, int endPort){
String key = KEY + userSetting.getServerId() + "_" + mediaServerId;
MediaSendRtpPortInfo mediaSendRtpPortInfo = new MediaSendRtpPortInfo(startPort, endPort, mediaServerId);
redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo);
}
public int getNextPort(String mediaServerId) {
String key = KEY + userSetting.getServerId() + "_" + mediaServerId;
MediaSendRtpPortInfo mediaSendRtpPortInfo = (MediaSendRtpPortInfo)redisTemplate.opsForValue().get(key);
if (mediaSendRtpPortInfo == null) {
logger.warn("[发送端口管理] 获取{}的发送端口时未找到端口信息", mediaSendRtpPortInfo);
return 0;
}
int port;
if (mediaSendRtpPortInfo.getCurrent() %2 != 0) {
port = mediaSendRtpPortInfo.getCurrent() + 1;
}else {
port = mediaSendRtpPortInfo.getCurrent() + 2;
}
if (port > mediaSendRtpPortInfo.getEnd()) {
if (mediaSendRtpPortInfo.getStart() %2 != 0) {
port = mediaSendRtpPortInfo.getStart() + 1;
}else {
port = mediaSendRtpPortInfo.getStart();
}
}
mediaSendRtpPortInfo.setCurrent(port);
redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo);
return port;
}
}

View File

@ -23,7 +23,6 @@ import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.*; import com.genersoft.iot.vmp.vmanager.bean.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -223,9 +222,6 @@ public class ZLMHttpHookListener {
HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
if (!"rtp".equals(param.getApp())) {
result.setEnable_audio(true);
}
taskExecutor.execute(() -> { taskExecutor.execute(() -> {
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
@ -259,20 +255,6 @@ public class ZLMHttpHookListener {
} }
} }
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "*";
// 将信息写入redis中以备后用
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (scan.size()>0) {
for (Object o : scan) {
String key = (String) o;
OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
if (otherRtpSendInfo != null && otherRtpSendInfo.getStream().equalsIgnoreCase(param.getStream())) {
result.setEnable_audio(true);
result.setEnable_mp4(true);
}
}
}
if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) { if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) {
logger.info("推流时发现尚未设置录像路径从assist服务中读取"); logger.info("推流时发现尚未设置录像路径从assist服务中读取");
JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null); JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null);
@ -291,6 +273,18 @@ public class ZLMHttpHookListener {
} }
} }
} }
if (param.getApp().equalsIgnoreCase("rtp")) {
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream();
System.out.println(receiveKey);
OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey);
System.out.println("otherRtpSendInfo != null ====>" + (otherRtpSendInfo != null));
if (otherRtpSendInfo != null) {
System.out.println("otherRtpSendInfo != null");
result.setEnable_audio(true);
result.setEnable_mp4(true);
}
}
logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, result);
return result; return result;
} }

View File

@ -1,20 +1,18 @@
package com.genersoft.iot.vmp.media.zlm; package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.HashMap;
import java.util.Map;
@Component @Component
public class ZLMRTPServerFactory { public class ZLMRTPServerFactory {
@ -30,68 +28,8 @@ public class ZLMRTPServerFactory {
@Autowired @Autowired
private ZlmHttpHookSubscribe hookSubscribe; private ZlmHttpHookSubscribe hookSubscribe;
private int[] portRangeArray = new int[2]; @Autowired
private SendRtpPortManager sendRtpPortManager;
public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
if (endPort <= startPort) {
return -1;
}
if (usedFreelist == null) {
usedFreelist = new ArrayList<>();
}
JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem);
if (listRtpServerJsonResult != null) {
JSONArray data = listRtpServerJsonResult.getJSONArray("data");
if (data != null) {
for (int i = 0; i < data.size(); i++) {
JSONObject dataItem = data.getJSONObject(i);
usedFreelist.add(dataItem.getInteger("port"));
}
}
}
Map<String, Object> param = new HashMap<>();
int result = -1;
// 设置推流端口
if (startPort%2 == 1) {
startPort ++;
}
boolean checkPort = false;
for (int i = startPort; i < endPort + 1; i+=2) {
if (!usedFreelist.contains(i)){
checkPort = true;
startPort = i;
break;
}
}
if (!checkPort) {
logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort);
return -1;
}
param.put("port", startPort);
String stream = UUID.randomUUID().toString();
param.put("enable_tcp", 1);
param.put("stream_id", stream);
// param.put("port", 0);
JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
if (openRtpServerResultJson != null) {
if (openRtpServerResultJson.getInteger("code") == 0) {
result= openRtpServerResultJson.getInteger("port");
Map<String, Object> closeRtpServerParam = new HashMap<>();
closeRtpServerParam.put("stream_id", stream);
zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
}else {
usedFreelist.add(startPort);
startPort +=2;
result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist);
}
}else {
// 检查ZLM状态
logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port"));
}
return result;
}
/** /**
* rtpServer * rtpServer
@ -220,13 +158,13 @@ public class ZLMRTPServerFactory {
* @return SendRtpItem * @return SendRtpItem
*/ */
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ String deviceId, String channelId, boolean tcp, boolean rtcp){
// 默认为随机端口 // 默认为随机端口
int localPort = 0; int localPort = 0;
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc, localPort, callback); localPort = sendRtpPortManager.getNextPort(serverItem.getId());
if (localPort == 0) { if (localPort == 0) {
return null; return null;
} }
@ -259,11 +197,11 @@ public class ZLMRTPServerFactory {
* @return SendRtpItem * @return SendRtpItem
*/ */
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ String app, String stream, String channelId, boolean tcp, boolean rtcp){
// 默认为随机端口 // 默认为随机端口
int localPort = 0; int localPort = 0;
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc, localPort, callback); localPort = sendRtpPortManager.getNextPort(serverItem.getId());
if (localPort == 0) { if (localPort == 0) {
return null; return null;
} }
@ -284,58 +222,6 @@ public class ZLMRTPServerFactory {
return sendRtpItem; return sendRtpItem;
} }
public interface KeepPortCallback{
Boolean keep(String ssrc);
}
/**
*
*/
public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) {
Map<String, Object> param = new HashMap<>(3);
param.put("port", localPort);
param.put("enable_tcp", 1);
param.put("stream_id", ssrc);
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
Integer finalLocalPort = localPort;
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, JSONObject response)->{
System.out.println("监听端口到期继续保持监听");
System.out.println(response);
if (ssrc.equals(response.getString("stream_id"))) {
if (keepPortCallback.keep(ssrc)) {
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback);
}else {
logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc);
releasePort(serverItem, ssrc);
}
}
});
logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
}else {
logger.info("[上级点播] 监听端口失败: {}", ssrc);
}
return localPort;
}
/**
*
*/
public boolean releasePort(MediaServerItem serverItem, String ssrc) {
logger.info("[上级点播] {}->释放监听端口", ssrc);
boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc);
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
return closeRTPServerResult;
}
/** /**
* zlm RESTFUL API startSendRtp * zlm RESTFUL API startSendRtp
*/ */

View File

@ -0,0 +1,50 @@
package com.genersoft.iot.vmp.media.zlm.dto;
public class MediaSendRtpPortInfo {
private int start;
private int end;
private String mediaServerId;
private int current;
public MediaSendRtpPortInfo(int start, int end, String mediaServerId) {
this.start = start;
this.current = start;
this.end = end;
this.mediaServerId = mediaServerId;
}
public int getStart() {
return start;
}
public void setStart(int start) {
this.start = start;
}
public int getEnd() {
return end;
}
public void setEnd(int end) {
this.end = end;
}
public String getMediaServerId() {
return mediaServerId;
}
public void setMediaServerId(String mediaServerId) {
this.mediaServerId = mediaServerId;
}
public int getCurrent() {
return current;
}
public void setCurrent(int current) {
this.current = current;
}
}

View File

@ -62,6 +62,9 @@ public class MediaServerItem{
@Schema(description = "多端口RTP收流端口范围") @Schema(description = "多端口RTP收流端口范围")
private String rtpPortRange; private String rtpPortRange;
@Schema(description = "RTP发流端口范围")
private String sendRtpPortRange;
@Schema(description = "assist服务端口") @Schema(description = "assist服务端口")
private int recordAssistPort; private int recordAssistPort;
@ -297,4 +300,12 @@ public class MediaServerItem{
public void setHookAliveInterval(Float hookAliveInterval) { public void setHookAliveInterval(Float hookAliveInterval) {
this.hookAliveInterval = hookAliveInterval; this.hookAliveInterval = hookAliveInterval;
} }
public String getSendRtpPortRange() {
return sendRtpPortRange;
}
public void setSendRtpPortRange(String sendRtpPortRange) {
this.sendRtpPortRange = sendRtpPortRange;
}
} }

View File

@ -50,4 +50,14 @@ public class HookResultForOnPublish extends HookResult{
public void setMp4_save_path(String mp4_save_path) { public void setMp4_save_path(String mp4_save_path) {
this.mp4_save_path = mp4_save_path; this.mp4_save_path = mp4_save_path;
} }
@Override
public String toString() {
return "HookResultForOnPublish{" +
"enable_audio=" + enable_audio +
", enable_mp4=" + enable_mp4 +
", mp4_max_second=" + mp4_max_second +
", mp4_save_path='" + mp4_save_path + '\'' +
'}';
}
} }

View File

@ -11,10 +11,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.*;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
@ -69,6 +66,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
@Autowired
private SendRtpPortManager sendRtpPortManager;
@Autowired @Autowired
private AssistRESTfulUtils assistRESTfulUtils; private AssistRESTfulUtils assistRESTfulUtils;
@ -115,13 +116,40 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) { if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) {
ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null); ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null);
} }
if (userSetting.getGbSendStreamStrict()) {
int startPort = 50000;
int endPort = 60000;
String sendRtpPortRange = mediaServerItem.getSendRtpPortRange();
if (sendRtpPortRange == null) {
logger.warn("[zlm] ] 未配置发流端口范围默认使用50000到60000");
}else {
String[] sendRtpPortRangeArray = sendRtpPortRange.trim().split(",");
if (sendRtpPortRangeArray.length != 2) {
logger.warn("[zlm] ] 发流端口范围错误默认使用50000到60000");
}else {
try {
startPort = Integer.parseInt(sendRtpPortRangeArray[0]);
endPort = Integer.parseInt(sendRtpPortRangeArray[1]);
if (endPort <= startPort) {
logger.warn("[zlm] ] 发流端口范围错误,结束端口应大于开始端口,使用默认端口");
startPort = 50000;
endPort = 60000;
}
}catch (NumberFormatException e) {
logger.warn("[zlm] ] 发流端口范围错误默认使用50000到60000");
}
}
}
logger.info("[[zlm] ] 配置发流端口范围,{}-{}", startPort, endPort);
sendRtpPortManager.initServerPort(mediaServerItem.getId(), startPort, endPort);
}
// 查询redis是否存在此mediaServer // 查询redis是否存在此mediaServer
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
Boolean hasKey = redisTemplate.hasKey(key); Boolean hasKey = redisTemplate.hasKey(key);
if (hasKey != null && ! hasKey) { if (hasKey != null && ! hasKey) {
redisTemplate.opsForValue().set(key, mediaServerItem); redisTemplate.opsForValue().set(key, mediaServerItem);
} }
} }
} }

View File

@ -317,9 +317,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
content.getPort(), content.getSsrc(), content.getPlatformId(), content.getPort(), content.getSsrc(), content.getPlatformId(),
content.getApp(), content.getStream(), content.getChannelId(), content.getApp(), content.getStream(), content.getChannelId(),
content.getTcp(), content.getRtcp(), ssrcFromCallback -> { content.getTcp(), content.getRtcp());
return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null;
});
WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
result.setCode(0); result.setCode(0);

View File

@ -28,6 +28,7 @@ public interface MediaServerMapper {
"secret, " + "secret, " +
"rtpEnable, " + "rtpEnable, " +
"rtpPortRange, " + "rtpPortRange, " +
"sendRtpPortRange, " +
"recordAssistPort, " + "recordAssistPort, " +
"defaultServer, " + "defaultServer, " +
"createTime, " + "createTime, " +
@ -51,6 +52,7 @@ public interface MediaServerMapper {
"#{secret}, " + "#{secret}, " +
"#{rtpEnable}, " + "#{rtpEnable}, " +
"#{rtpPortRange}, " + "#{rtpPortRange}, " +
"#{sendRtpPortRange}, " +
"#{recordAssistPort}, " + "#{recordAssistPort}, " +
"#{defaultServer}, " + "#{defaultServer}, " +
"#{createTime}, " + "#{createTime}, " +
@ -75,6 +77,7 @@ public interface MediaServerMapper {
"<if test=\"autoConfig != null\">, autoConfig=#{autoConfig}</if>" + "<if test=\"autoConfig != null\">, autoConfig=#{autoConfig}</if>" +
"<if test=\"rtpEnable != null\">, rtpEnable=#{rtpEnable}</if>" + "<if test=\"rtpEnable != null\">, rtpEnable=#{rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtpPortRange=#{rtpPortRange}</if>" + "<if test=\"rtpPortRange != null\">, rtpPortRange=#{rtpPortRange}</if>" +
"<if test=\"sendRtpPortRange != null\">, sendRtpPortRange=#{sendRtpPortRange}</if>" +
"<if test=\"secret != null\">, secret=#{secret}</if>" + "<if test=\"secret != null\">, secret=#{secret}</if>" +
"<if test=\"recordAssistPort != null\">, recordAssistPort=#{recordAssistPort}</if>" + "<if test=\"recordAssistPort != null\">, recordAssistPort=#{recordAssistPort}</if>" +
"<if test=\"hookAliveInterval != null\">, hookAliveInterval=#{hookAliveInterval}</if>" + "<if test=\"hookAliveInterval != null\">, hookAliveInterval=#{hookAliveInterval}</if>" +
@ -98,6 +101,7 @@ public interface MediaServerMapper {
"<if test=\"autoConfig != null\">, autoConfig=#{autoConfig}</if>" + "<if test=\"autoConfig != null\">, autoConfig=#{autoConfig}</if>" +
"<if test=\"rtpEnable != null\">, rtpEnable=#{rtpEnable}</if>" + "<if test=\"rtpEnable != null\">, rtpEnable=#{rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtpPortRange=#{rtpPortRange}</if>" + "<if test=\"rtpPortRange != null\">, rtpPortRange=#{rtpPortRange}</if>" +
"<if test=\"sendRtpPortRange != null\">, sendRtpPortRange=#{sendRtpPortRange}</if>" +
"<if test=\"secret != null\">, secret=#{secret}</if>" + "<if test=\"secret != null\">, secret=#{secret}</if>" +
"<if test=\"recordAssistPort != null\">, recordAssistPort=#{recordAssistPort}</if>" + "<if test=\"recordAssistPort != null\">, recordAssistPort=#{recordAssistPort}</if>" +
"<if test=\"hookAliveInterval != null\">, hookAliveInterval=#{hookAliveInterval}</if>" + "<if test=\"hookAliveInterval != null\">, hookAliveInterval=#{hookAliveInterval}</if>" +

View File

@ -3,20 +3,16 @@ package com.genersoft.iot.vmp.vmanager.rtp;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
@ -27,7 +23,6 @@ import okhttp3.Request;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
@ -36,6 +31,7 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Tag(name = "第三方服务对接") @Tag(name = "第三方服务对接")
@ -56,20 +52,11 @@ public class RtpController {
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired @Autowired
private VersionInfo versionInfo; private SendRtpPortManager sendRtpPortManager;
@Autowired
private SipConfig sipConfig;
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
@Autowired
private IDeviceService deviceService;
@Autowired
private IDeviceChannelService channelService;
@Autowired @Autowired
private DynamicTask dynamicTask; private DynamicTask dynamicTask;
@ -78,14 +65,6 @@ public class RtpController {
private RedisTemplate<Object, Object> redisTemplate; private RedisTemplate<Object, Object> redisTemplate;
@Value("${server.port}")
private int serverPort;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@GetMapping(value = "/receive/open") @GetMapping(value = "/receive/open")
@ResponseBody @ResponseBody
@Operation(summary = "开启收流和获取发流信息") @Operation(summary = "开启收流和获取发流信息")
@ -145,24 +124,15 @@ public class RtpController {
otherRtpSendInfo.setReceivePort(localPort); otherRtpSendInfo.setReceivePort(localPort);
otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setCallId(callId);
otherRtpSendInfo.setStream(stream); otherRtpSendInfo.setStream(stream);
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream; String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
// 将信息写入redis中以备后用 // 将信息写入redis中以备后用
redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo);
if (isSend != null && isSend) { if (isSend != null && isSend) {
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId;
// 预创建发流信息 // 预创建发流信息
int port = zlmServerFactory.keepPort(mediaServerItem, callId, 0, ssrc1 -> { int port = sendRtpPortManager.getNextPort(mediaServerItem.getId());
return redisTemplate.opsForValue().get(key) != null;
});
// 将信息写入redis中以备后用 // 将信息写入redis中以备后用
redisTemplate.opsForValue().set(key, otherRtpSendInfo); redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS);
// 设置超时任务,超时未使用,则自动移除,并关闭端口保持, 默认五分钟
dynamicTask.startDelay(key, ()->{
logger.info("[第三方服务对接->开启收流和获取发流信息] 端口保持超时 callId->{}", callId);
redisTemplate.delete(key);
zlmServerFactory.releasePort(mediaServerItem, callId);
}, 15000);
otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
otherRtpSendInfo.setPort(port); otherRtpSendInfo.setPort(port);
logger.info("[第三方服务对接->开启收流和获取发流信息] 结果callId->{} {}", callId, otherRtpSendInfo); logger.info("[第三方服务对接->开启收流和获取发流信息] 结果callId->{} {}", callId, otherRtpSendInfo);
@ -178,7 +148,7 @@ public class RtpController {
logger.info("[第三方服务对接->关闭收流] stream->{}", stream); logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
zlmServerFactory.closeRtpServer(mediaServerItem,stream); zlmServerFactory.closeRtpServer(mediaServerItem,stream);
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream; String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
// 将信息写入redis中以备后用 // 将信息写入redis中以备后用
redisTemplate.delete(receiveKey); redisTemplate.delete(receiveKey);
} }
@ -203,11 +173,9 @@ public class RtpController {
streamType = 1; streamType = 1;
} }
MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId;
OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
if (sendInfo != null) { if (sendInfo == null) {
zlmServerFactory.releasePort(mediaServerItem, callId);
}else {
sendInfo = new OtherRtpSendInfo(); sendInfo = new OtherRtpSendInfo();
} }
sendInfo.setPushApp(app); sendInfo.setPushApp(app);
@ -229,7 +197,6 @@ public class RtpController {
param.put("only_audio", onlyAudio ? "1" : "0"); param.put("only_audio", onlyAudio ? "1" : "0");
param.put("pt", pt); param.put("pt", pt);
dynamicTask.stop(key);
Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
if (streamReady) { if (streamReady) {
logger.info("[第三方服务对接->发送流] 流存在开始发流callId->{}", callId); logger.info("[第三方服务对接->发送流] 流存在开始发流callId->{}", callId);
@ -279,7 +246,7 @@ public class RtpController {
@Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
public void closeSendRTP(String callId) { public void closeSendRTP(String callId) {
logger.info("[第三方服务对接->关闭发送流] callId->{}", callId); logger.info("[第三方服务对接->关闭发送流] callId->{}", callId);
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId;
OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
if (sendInfo == null){ if (sendInfo == null){
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");

View File

@ -89,6 +89,11 @@
- -
<el-input v-model="rtpPortRange2" placeholder="终止" @change="portRangeChange" clearable style="width: 100px" prop="rtpPortRange2" :disabled="mediaServerForm.defaultServer"></el-input> <el-input v-model="rtpPortRange2" placeholder="终止" @change="portRangeChange" clearable style="width: 100px" prop="rtpPortRange2" :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item> </el-form-item>
<el-form-item v-if="mediaServerForm.sendRtpEnable" label="发流端口" >
<el-input v-model="sendRtpPortRange1" placeholder="起始" @change="portRangeChange" clearable style="width: 100px" prop="rtpPortRange1" :disabled="mediaServerForm.defaultServer"></el-input>
-
<el-input v-model="sendRtpPortRange2" placeholder="终止" @change="portRangeChange" clearable style="width: 100px" prop="rtpPortRange2" :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
<el-form-item label="录像管理服务端口" prop="recordAssistPort"> <el-form-item label="录像管理服务端口" prop="recordAssistPort">
<el-input v-model.number="mediaServerForm.recordAssistPort" :disabled="mediaServerForm.defaultServer"> <el-input v-model.number="mediaServerForm.recordAssistPort" :disabled="mediaServerForm.defaultServer">
<!-- <el-button v-if="mediaServerForm.recordAssistPort > 0" slot="append" type="primary" @click="checkRecordServer"></el-button>--> <!-- <el-button v-if="mediaServerForm.recordAssistPort > 0" slot="append" type="primary" @click="checkRecordServer"></el-button>-->
@ -172,6 +177,7 @@ export default {
rtmpSSlPort: "", rtmpSSlPort: "",
rtpEnable: false, rtpEnable: false,
rtpPortRange: "", rtpPortRange: "",
sendRtpPortRange: "",
rtpProxyPort: "", rtpProxyPort: "",
rtspPort: "", rtspPort: "",
rtspSSLPort: "", rtspSSLPort: "",
@ -179,6 +185,9 @@ export default {
rtpPortRange1:30000, rtpPortRange1:30000,
rtpPortRange2:30500, rtpPortRange2:30500,
sendRtpPortRange1:50000,
sendRtpPortRange2:60000,
rules: { rules: {
ip: [{ required: true, validator: isValidIp, message: '请输入有效的IP地址', trigger: 'blur' }], ip: [{ required: true, validator: isValidIp, message: '请输入有效的IP地址', trigger: 'blur' }],
httpPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], httpPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }],
@ -214,10 +223,15 @@ export default {
this.currentStep = 3; this.currentStep = 3;
if (param.rtpPortRange) { if (param.rtpPortRange) {
let rtpPortRange = this.mediaServerForm.rtpPortRange.split(","); let rtpPortRange = this.mediaServerForm.rtpPortRange.split(",");
let sendRtpPortRange = this.mediaServerForm.sendRtpPortRange.split(",");
if (rtpPortRange.length > 0) { if (rtpPortRange.length > 0) {
this.rtpPortRange1 = rtpPortRange[0] this.rtpPortRange1 = rtpPortRange[0]
this.rtpPortRange2 = rtpPortRange[1] this.rtpPortRange2 = rtpPortRange[1]
} }
if (sendRtpPortRange.length > 0) {
this.sendRtpPortRange1 = sendRtpPortRange[0]
this.sendRtpPortRange2 = sendRtpPortRange[1]
}
} }
} }
}, },
@ -240,6 +254,8 @@ export default {
that.mediaServerForm.autoConfig = true; that.mediaServerForm.autoConfig = true;
that.rtpPortRange1 = 30000 that.rtpPortRange1 = 30000
that.rtpPortRange2 = 30500 that.rtpPortRange2 = 30500
that.sendRtpPortRange1 = 50000
that.sendRtpPortRange2 = 60000
that.serverCheck = 1; that.serverCheck = 1;
}else { }else {
that.serverCheck = -1; that.serverCheck = -1;
@ -321,12 +337,15 @@ export default {
rtmpSSlPort: "", rtmpSSlPort: "",
rtpEnable: false, rtpEnable: false,
rtpPortRange: "", rtpPortRange: "",
sendRtpPortRange: "",
rtpProxyPort: "", rtpProxyPort: "",
rtspPort: "", rtspPort: "",
rtspSSLPort: "", rtspSSLPort: "",
}; };
this.rtpPortRange1 = 30500; this.rtpPortRange1 = 30500;
this.rtpPortRange2 = 30500; this.rtpPortRange2 = 30500;
this.sendRtpPortRange1 = 50000;
this.sendRtpPortRange2 = 60000;
this.listChangeCallback = null this.listChangeCallback = null
this.currentStep = 1; this.currentStep = 1;
}, },
@ -351,7 +370,7 @@ export default {
portRangeChange: function() { portRangeChange: function() {
if (this.mediaServerForm.rtpEnable) { if (this.mediaServerForm.rtpEnable) {
this.mediaServerForm.rtpPortRange = this.rtpPortRange1 + "," + this.rtpPortRange2 this.mediaServerForm.rtpPortRange = this.rtpPortRange1 + "," + this.rtpPortRange2
console.log(this.mediaServerForm.rtpPortRange) this.mediaServerForm.sendRtpPortRange = this.sendRtpPortRange1 + "," + this.sendRtpPortRange2
} }
} }
}, },