diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 189976d1..e0451a06 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -10,6 +10,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; @@ -29,7 +31,6 @@ import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; -import java.text.ParseException; import java.util.HashMap; import java.util.Map; @@ -99,49 +100,62 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); return; } - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + String isUdp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", isUdp); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, callIdHeader); + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader); }); - }else { - JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem); - if (startSendRtpStreamResult != null) { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, callIdHeader); - } - } - } - } - private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, - JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { - if (jsonObject == null) { - logger.error("RTP推流失败: 请检查ZLM服务"); - } else if (jsonObject.getInteger("code") == 0) { - logger.info("调用ZLM推流接口, 结果: {}", jsonObject); - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); - } else { - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); - if (sendRtpItem.isOnlyAudio()) { - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - try { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); - } catch (SipException | ParseException | InvalidArgumentException | - SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); + } else { + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); } } + if (startSendRtpStreamResult != null) { + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); + } } } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 8ce0ae86..79226a7a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1035,12 +1035,12 @@ public class PlayServiceImpl implements IPlayService { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 - MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId()); + MediaServerItem mediaServerItemInUse = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItemInUse, sendRtpItem.getApp(), sendRtpItem.getStreamId()); if (streamReady) { logger.warn("语音广播已经开启: {}", channelId); event.call("语音广播已经开启"); - return; + return false; } else { stopAudioBroadcast(device.getDeviceId(), channelId); }