From 9c555b56b70ed05c1b13f0a26098b702d7364839 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 18 May 2022 18:14:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=AF=AD=E9=9F=B3=E5=B9=BF?= =?UTF-8?q?=E6=92=AD=E7=9A=84TCP=E4=B8=BB=E5=8A=A8=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/AckRequestProcessor.java | 22 +++- .../request/impl/InviteRequestProcessor.java | 104 +++++++++++++++++- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 4 + .../vmp/media/zlm/ZLMRTPServerFactory.java | 7 ++ 4 files changed, 127 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 153a08ab..85ee6471 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -107,29 +107,41 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId()); + logger.info("[收到ACK],开始使用{}向上级推流 {}/{}->{}:{}({})", sendRtpItem.isTcp() ? "TCP" : "UDP", + sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp() ,sendRtpItem.getPort(), + sendRtpItem.getSsrc()); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); param.put("stream",sendRtpItem.getStreamId()); param.put("ssrc", sendRtpItem.getSsrc()); - param.put("dst_url",sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - param.put("is_udp", is_Udp); param.put("src_port", sendRtpItem.getLocalPort()); param.put("pt", sendRtpItem.getPt()); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + JSONObject jsonObject; + if (sendRtpItem.isTcpActive()) { + jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("is_udp", is_Udp); + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { + if (sendRtpItem.isOnlyAudio()) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog()); audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest()); audioBroadcastManager.update(audioBroadcastCatch); + String waiteStreamTimeoutTaskKey = "waite-stream-" + audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId(); + dynamicTask.stop(waiteStreamTimeoutTaskKey); } logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); } else { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 3d71c293..eabfd1af 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; @@ -20,7 +22,9 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -90,6 +94,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; + @Autowired + private ZLMRESTfulUtils zlmresTfulUtils; + @Autowired private IMediaServerService mediaServerService; @@ -674,7 +681,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements subscribeKey.put("mediaServerId", mediaServerItem.getId()); String finalSsrc = ssrc; // 流已经存在时直接推流 - if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", stream); + JSONArray tracks = mediaInfo.getJSONArray("tracks"); + Integer codecId = null; + if (tracks != null && tracks.size() > 0) { + for (int i = 0; i < tracks.size(); i++) { + MediaItem.MediaTrack track = JSON.toJavaObject((JSON)tracks.get(i),MediaItem.MediaTrack.class); + if (track.getCodecType() == 1) { + codecId = track.getCodecId(); + break; + } + } + } + if ((mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"))) { logger.info("发现已经在推流"); sendRtpItem.setStatus(2); redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -684,9 +703,40 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + if (codecId == null) { + if (mediaTransmissionTCP) { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); + }else { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + } + + content.append("a=rtpmap:8 PCMA/8000\r\n"); + }else { + if (codecId == 4) { + if (mediaTransmissionTCP) { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n"); + }else { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n"); + } + content.append("a=rtpmap:0 PCMU/8000\r\n"); + }else { + if (mediaTransmissionTCP) { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); + }else { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + } + content.append("a=rtpmap:8 PCMA/8000\r\n"); + } + } + if (sendRtpItem.isTcp()) { + content.append("a=connection:new\r\n"); + if (!sendRtpItem.isTcpActive()) { + content.append("a=setup:active\r\n"); + }else { + content.append("a=setup:passive\r\n"); + } + } content.append("a=sendonly\r\n"); - content.append("a=rtpmap:8 PCMA/8000\r\n"); content.append("y="+ finalSsrc + "\r\n"); content.append("f=v/////a/1/8/1\r\n"); @@ -727,9 +777,22 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } }, 20*1000); + boolean finalMediaTransmissionTCP = mediaTransmissionTCP; subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ logger.info("收到语音对讲推流"); + MediaItem mediaItem = JSON.toJavaObject(json, MediaItem.class); + Integer audioCodecId = null; + if (mediaItem.getTracks() != null && mediaItem.getTracks().size() > 0) { + for (int i = 0; i < mediaItem.getTracks().size(); i++) { + MediaItem.MediaTrack mediaTrack = mediaItem.getTracks().get(i); + if (mediaTrack.getCodecType() == 1) { + audioCodecId = mediaTrack.getCodecId(); + break; + } + } + } + try { sendRtpItem.setStatus(2); redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -739,9 +802,40 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + if (audioCodecId == null) { + if (finalMediaTransmissionTCP) { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); + }else { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + } + + content.append("a=rtpmap:8 PCMA/8000\r\n"); + }else { + if (audioCodecId == 4) { + if (finalMediaTransmissionTCP) { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n"); + }else { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n"); + } + content.append("a=rtpmap:0 PCMU/8000\r\n"); + }else { + if (finalMediaTransmissionTCP) { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); + }else { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + } + content.append("a=rtpmap:8 PCMA/8000\r\n"); + } + } content.append("a=sendonly\r\n"); - content.append("a=rtpmap:8 PCMA/8000\r\n"); + if (sendRtpItem.isTcp()) { + content.append("a=connection:new\r\n"); + if (!sendRtpItem.isTcpActive()) { + content.append("a=setup:active\r\n"); + }else { + content.append("a=setup:passive\r\n"); + } + } content.append("y="+ finalSsrc + "\r\n"); content.append("f=v/////a/1/8/1\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 4b2bf487..75f55cf2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -253,6 +253,10 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "startSendRtp",param, null); } + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map param) { + return sendPost(mediaServerItem, "startSendRtpPassive",param, null); + } + public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map param) { return sendPost(mediaServerItem, "stopSendRtp",param, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 3a91f6a0..34918ae1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -249,6 +249,13 @@ public class ZLMRTPServerFactory { return zlmresTfulUtils.startSendRtp(mediaServerItem, param); } + /** + * 调用zlm RESTFUL API —— startSendRtpPassive + */ + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Mapparam) { + return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param); + } + /** * 查询待转推的流是否就绪 */