From 9fc3db1f5e381378fd54818e0ba017df6be96fa9 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 13 Jul 2023 14:30:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8F=91=E6=B5=81=E9=9F=B3?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E5=88=86=E5=BC=80=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/media/zlm/dto/HookType.java | 3 +- .../vmp/vmanager/bean/OtherRtpSendInfo.java | 89 +++++--- .../iot/vmp/vmanager/rtp/RtpController.java | 198 +++++++++++++----- 3 files changed, 211 insertions(+), 79 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java index a4557e9a..235cea7b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java @@ -21,5 +21,6 @@ public enum HookType { on_server_started, on_rtp_server_timeout, - on_server_keepalive + on_server_keepalive, + on_send_rtp_stopped } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java index 9fea5c54..75c05d3b 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java @@ -5,12 +5,17 @@ public class OtherRtpSendInfo { /** * 发流IP */ - private String ip; + private String sendLocalIp; /** - * 发流端口 + * 音频发流端口 */ - private int port; + private int sendLocalPortForAudio; + + /** + * 视频发流端口 + */ + private int sendLocalPortForVideo; /** * 收流IP @@ -18,9 +23,14 @@ public class OtherRtpSendInfo { private String receiveIp; /** - * 收流端口 + * 音频收流端口 */ - private int receivePort; + private int receivePortForAudio; + + /** + * 视频收流端口 + */ + private int receivePortForVideo; /** * 会话ID @@ -48,23 +58,6 @@ public class OtherRtpSendInfo { private String pushSSRC; - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - public String getReceiveIp() { return receiveIp; } @@ -73,12 +66,20 @@ public class OtherRtpSendInfo { this.receiveIp = receiveIp; } - public int getReceivePort() { - return receivePort; + public int getReceivePortForAudio() { + return receivePortForAudio; } - public void setReceivePort(int receivePort) { - this.receivePort = receivePort; + public void setReceivePortForAudio(int receivePortForAudio) { + this.receivePortForAudio = receivePortForAudio; + } + + public int getReceivePortForVideo() { + return receivePortForVideo; + } + + public void setReceivePortForVideo(int receivePortForVideo) { + this.receivePortForVideo = receivePortForVideo; } public String getCallId() { @@ -121,15 +122,45 @@ public class OtherRtpSendInfo { this.pushSSRC = pushSSRC; } + + public String getSendLocalIp() { + return sendLocalIp; + } + + public void setSendLocalIp(String sendLocalIp) { + this.sendLocalIp = sendLocalIp; + } + + public int getSendLocalPortForAudio() { + return sendLocalPortForAudio; + } + + public void setSendLocalPortForAudio(int sendLocalPortForAudio) { + this.sendLocalPortForAudio = sendLocalPortForAudio; + } + + public int getSendLocalPortForVideo() { + return sendLocalPortForVideo; + } + + public void setSendLocalPortForVideo(int sendLocalPortForVideo) { + this.sendLocalPortForVideo = sendLocalPortForVideo; + } + @Override public String toString() { return "OtherRtpSendInfo{" + - " ip='" + ip + '\'' + - ", port=" + port + + "sendLocalIp='" + sendLocalIp + '\'' + + ", sendLocalPortForAudio=" + sendLocalPortForAudio + + ", sendLocalPortForVideo=" + sendLocalPortForVideo + ", receiveIp='" + receiveIp + '\'' + - ", receivePort=" + receivePort + + ", receivePortForAudio=" + receivePortForAudio + + ", receivePortForVideo=" + receivePortForVideo + ", callId='" + callId + '\'' + ", stream='" + stream + '\'' + + ", pushApp='" + pushApp + '\'' + + ", pushStream='" + pushStream + '\'' + + ", pushSSRC='" + pushSSRC + '\'' + '}'; } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index b7a7e152..a9a66f5c 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -98,10 +98,13 @@ public class RtpController { }catch (NumberFormatException e) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误"); } - } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPortForVideo = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPortForAudio = zlmServerFactory.createRTPServer(mediaServerItem, stream + "_a" , ssrcInt, null, false, tcpMode); + if (localPortForVideo == 0 || localPortForAudio == 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); + } // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); @@ -121,12 +124,14 @@ public class RtpController { } catch (IOException e) { logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); } + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); } }); } OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo(); otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setReceivePort(localPort); + otherRtpSendInfo.setReceivePortForVideo(localPortForVideo); + otherRtpSendInfo.setReceivePortForAudio(localPortForAudio); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); @@ -135,11 +140,13 @@ public class RtpController { if (isSend != null && isSend) { String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; // 预创建发流信息 - int port = sendRtpPortManager.getNextPort(mediaServerItem.getId()); + int portForVideo = sendRtpPortManager.getNextPort(mediaServerItem.getId()); + int portForAudio = sendRtpPortManager.getNextPort(mediaServerItem.getId()); // 将信息写入redis中,以备后用 redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); - otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setPort(port); + otherRtpSendInfo.setSendLocalIp(mediaServerItem.getSdpIp()); + otherRtpSendInfo.setSendLocalPortForVideo(portForVideo); + otherRtpSendInfo.setSendLocalPortForAudio(portForAudio); logger.info("[第三方服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo); } return otherRtpSendInfo; @@ -153,6 +160,7 @@ public class RtpController { logger.info("[第三方服务对接->关闭收流] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); + zlmServerFactory.closeRtpServer(mediaServerItem,stream + "_a"); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; List scan = RedisUtil.scan(redisTemplate, receiveKey); if (scan.size() > 0) { @@ -167,20 +175,51 @@ public class RtpController { @ResponseBody @Operation(summary = "发送流") @Parameter(name = "ssrc", description = "发送流的SSRC", required = true) - @Parameter(name = "ip", description = "目标IP", required = true) - @Parameter(name = "port", description = "目标端口", required = true) + @Parameter(name = "dstIpForAudio", description = "目标音频收流IP", required = false) + @Parameter(name = "dstIpForVideo", description = "目标视频收流IP", required = false) + @Parameter(name = "dstPortForAudio", description = "目标音频收流端口", required = false) + @Parameter(name = "dstPortForVideo", description = "目标视频收流端口", required = false) @Parameter(name = "app", description = "待发送应用名", required = true) @Parameter(name = "stream", description = "待发送流Id", required = true) @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) - @Parameter(name = "onlyAudio", description = "是否只有音频", required = true) @Parameter(name = "isUdp", description = "是否为UDP", required = true) - @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false) - @Parameter(name = "pt", description = "rtp的pt", required = true) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) { - logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}", - ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt); - if (ObjectUtils.isEmpty(streamType)) { - streamType = 1; + @Parameter(name = "ptForAudio", description = "rtp的音频pt", required = false) + @Parameter(name = "ptForVideo", description = "rtp的视频pt", required = false) + public void sendRTP(String ssrc, + @RequestParam(required = false)String dstIpForAudio, + @RequestParam(required = false)String dstIpForVideo, + @RequestParam(required = false)Integer dstPortForAudio, + @RequestParam(required = false)Integer dstPortForVideo, + String app, + String stream, + String callId, + Boolean isUdp, + @RequestParam(required = false)Integer ptForAudio, + @RequestParam(required = false)Integer ptForVideo + ) { + logger.info("[第三方服务对接->发送流] " + + "ssrc->{}, \r\n" + + "dstIpForAudio->{}, \n" + + "dstIpForAudio->{}, \n" + + "dstPortForAudio->{}, \n" + + "dstPortForVideo->{}, \n" + + "app->{}, \n" + + "stream->{}, \n" + + "callId->{}, \n" + + "ptForAudio->{}, \n" + + "ptForVideo->{}", + ssrc, + dstIpForAudio, + dstIpForVideo, + dstPortForAudio, + dstPortForVideo, + app, + stream, + callId, + ptForAudio, + ptForVideo); + if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "至少应该存在一组音频或视频发送参数"); } MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; @@ -192,32 +231,74 @@ public class RtpController { sendInfo.setPushStream(stream); sendInfo.setPushSSRC(ssrc); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",app); - param.put("stream",stream); - param.put("ssrc", ssrc); + Map paramForAudio; + Map paramForVideo; + if (!ObjectUtils.isEmpty(dstIpForAudio) && dstPortForAudio > 0) { + paramForAudio = new HashMap<>(); + paramForAudio.put("vhost","__defaultVhost__"); + paramForAudio.put("app",app); + paramForAudio.put("stream",stream); + paramForAudio.put("ssrc", ssrc); - param.put("dst_url",ip); - param.put("dst_port", port); - String is_Udp = isUdp ? "1" : "0"; - param.put("is_udp", is_Udp); - param.put("src_port", sendInfo.getPort()); - param.put("use_ps", streamType==2 ? "1" : "0"); - param.put("only_audio", onlyAudio ? "1" : "0"); - param.put("pt", pt); + paramForAudio.put("dst_url", dstIpForAudio); + paramForAudio.put("dst_port", dstPortForAudio); + String is_Udp = isUdp ? "1" : "0"; + paramForAudio.put("is_udp", is_Udp); + paramForAudio.put("src_port", sendInfo.getSendLocalPortForAudio()); + paramForAudio.put("use_ps", "0"); + paramForAudio.put("only_audio", "1"); + if (ptForAudio != null) { + paramForAudio.put("pt", ptForAudio); + } + + } else { + paramForAudio = null; + } + if (!ObjectUtils.isEmpty(dstIpForVideo) && dstPortForVideo > 0) { + paramForVideo = new HashMap<>(); + paramForVideo.put("vhost","__defaultVhost__"); + paramForVideo.put("app",app); + paramForVideo.put("stream",stream); + paramForVideo.put("ssrc", ssrc); + + paramForVideo.put("dst_url", dstIpForVideo); + paramForVideo.put("dst_port", dstPortForVideo); + String is_Udp = isUdp ? "1" : "0"; + paramForVideo.put("is_udp", is_Udp); + paramForVideo.put("src_port", sendInfo.getSendLocalPortForVideo()); + paramForVideo.put("use_ps", "0"); + paramForVideo.put("only_audio", "0"); + if (ptForVideo != null) { + paramForVideo.put("pt", ptForVideo); + } + + } else { + paramForVideo = null; + } Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); if (streamReady) { - logger.info("[第三方服务对接->发送流] 流存在,开始发流,callId->{}", callId); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - if (jsonObject.getInteger("code") == 0) { - logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId); - redisTemplate.opsForValue().set(key, sendInfo); - }else { - redisTemplate.delete(key); - logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg")); + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 音频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg")); + } } }else { logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId); @@ -231,21 +312,39 @@ public class RtpController { // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, response)->{ dynamicTask.stop(uuid); - logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{},param->{}", callId, JSONObject.toJSONString(param)); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - System.out.println("========发流结果=========="); - System.out.println(jsonObject); - if (jsonObject.getInteger("code") == 0) { - logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId); - redisTemplate.opsForValue().set(key, finalSendInfo); - }else { - redisTemplate.delete(key); - logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg")); + logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 音频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg")); + } + } + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); }); } } @@ -274,6 +373,7 @@ public class RtpController { }else { logger.info("[第三方服务对接->关闭发送流] 成功 callId->{}", callId); } + redisTemplate.delete(key); } }