临时提交

结构优化
648540858 2023-07-01 10:14:19 +08:00
parent 1458014fe3
commit fb90dfdac5
3 changed files with 20 additions and 20 deletions

View File

@ -1031,6 +1031,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
logger.info("设备{}请求语音流, 收流地址:{}:{}ssrc{}, {}, 对讲方式:{}", requesterId, addressStr, port, ssrc, logger.info("设备{}请求语音流, 收流地址:{}:{}ssrc{}, {}, 对讲方式:{}", requesterId, addressStr, port, ssrc,
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue()); mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), broadcastCatch.getChannelId(), device.getDeviceId(), broadcastCatch.getChannelId(),
mediaTransmissionTCP, false, ssrcFromCallback -> { mediaTransmissionTCP, false, ssrcFromCallback -> {
@ -1050,7 +1051,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
sendRtpItem.setPlayType(InviteStreamType.BROADCAST); sendRtpItem.setPlayType(InviteStreamType.BROADCAST);
sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setPlatformId(requesterId); sendRtpItem.setPlatformId(requesterId);

View File

@ -174,9 +174,10 @@ public class ZLMRTPServerFactory {
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("stream_id", streamId); param.put("stream_id", streamId);
JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(serverItem, param); JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(serverItem, param);
logger.info("关闭RTP Server " + jsonObject);
if (jsonObject != null ) { if (jsonObject != null ) {
if (jsonObject.getInteger("code") == 0) { if (jsonObject.getInteger("code") == 0) {
result = jsonObject.getInteger("hit") == 1; result = jsonObject.getInteger("hit") >= 1;
}else { }else {
logger.error("关闭RTP Server 失败: " + jsonObject.getString("msg")); logger.error("关闭RTP Server 失败: " + jsonObject.getString("msg"));
} }
@ -301,6 +302,7 @@ public class ZLMRTPServerFactory {
param.put("port", localPort); param.put("port", localPort);
param.put("enable_tcp", 1); param.put("enable_tcp", 1);
param.put("stream_id", ssrc); param.put("stream_id", ssrc);
param.put("re_use_port", 1);
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
if (jsonObject.getInteger("code") == 0) { if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port"); localPort = jsonObject.getInteger("port");
@ -335,6 +337,11 @@ public class ZLMRTPServerFactory {
public boolean releasePort(MediaServerItem serverItem, String ssrc) { public boolean releasePort(MediaServerItem serverItem, String ssrc) {
logger.info("[保持端口] {}->释放监听端口", ssrc); logger.info("[保持端口] {}->释放监听端口", ssrc);
boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc); boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);

View File

@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
@ -1436,14 +1435,10 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) { public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
// 开始发流 // 开始发流
// 取消设置的超时任务
// String channelId = request.getCallIdHeader().getCallId();
String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACKrtp/{}开始推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(), logger.info("[开始推流] rtp/{}, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
Map<String, Object> param = new HashMap<>(12); Map<String, Object> param = new HashMap<>(12);
param.put("vhost", "__defaultVhost__"); param.put("vhost", "__defaultVhost__");
@ -1469,12 +1464,10 @@ public class PlayServiceImpl implements IPlayService {
startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader); startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
}); });
} else { } else {
// 如果是严格模式,需要关闭端口占用 // 如果是严格模式,需要关闭端口占用
JSONObject startSendRtpStreamResult = null; JSONObject startSendRtpStreamResult = null;
if (sendRtpItem.getLocalPort() != 0) { if (sendRtpItem.getLocalPort() != 0) {
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc());
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
if (sendRtpItem.isTcpActive()) { if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
} else { } else {
@ -1482,7 +1475,6 @@ public class PlayServiceImpl implements IPlayService {
param.put("dst_port", sendRtpItem.getPort()); param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
} }
}
} else { } else {
if (sendRtpItem.isTcpActive()) { if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
@ -1505,7 +1497,8 @@ public class PlayServiceImpl implements IPlayService {
logger.error("RTP推流失败: 请检查ZLM服务"); logger.error("RTP推流失败: 请检查ZLM服务");
} else if (jsonObject.getInteger("code") == 0) { } else if (jsonObject.getInteger("code") == 0) {
logger.info("调用ZLM推流接口, 结果: {}", jsonObject); logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); logger.info("RTP推流成功[ {}/{} ]{}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"),
sendRtpItem.isTcpActive()?"被动发流": param.get("dst_url") + ":" + param.get("dst_port"));
} else { } else {
logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param)); logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
if (sendRtpItem.isOnlyAudio()) { if (sendRtpItem.isOnlyAudio()) {