diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 47e51baa..3db903bb 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -69,6 +69,7 @@ public class VideoManagerConstants { public static final String SYSTEM_INFO_NET_PREFIX = "VMP_SYSTEM_INFO_NET_"; public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_"; + public static final String BROADCAST_WAITE_INVITE = "task_broadcast_waite_invite_"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index 581ea6f8..564fb646 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -43,6 +43,8 @@ public class UserSetting { private Boolean syncChannelOnDeviceOnline = Boolean.FALSE; + private Boolean pushStreamAfterAck = Boolean.FALSE; + private String serverId = "000000"; private String thirdPartyGBIdReg = "[\\s\\S]*"; @@ -206,4 +208,12 @@ public class UserSetting { public void setBroadcastForPlatform(String broadcastForPlatform) { this.broadcastForPlatform = broadcastForPlatform; } + + public Boolean getPushStreamAfterAck() { + return pushStreamAfterAck; + } + + public void setPushStreamAfterAck(Boolean pushStreamAfterAck) { + this.pushStreamAfterAck = pushStreamAfterAck; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index 072d0cbc..5acbf8eb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -83,4 +83,19 @@ public class AudioBroadcastManager { return audioBroadcastCatch; } + + public List get(String deviceId) { + List audioBroadcastCatchList= new ArrayList<>(); + if (SipUtils.isFrontEnd(deviceId)) { + audioBroadcastCatchList.add(data.get(deviceId)); + }else { + for (String key : data.keySet()) { + if (key.startsWith(deviceId)) { + audioBroadcastCatchList.add(data.get(key)); + } + } + } + + return audioBroadcastCatchList; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index ddb51699..f3dfa966 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -1,14 +1,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import gov.nist.javax.sip.stack.SIPServerTransactionImpl; import org.apache.commons.lang3.ArrayUtils; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -17,14 +13,14 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.security.core.parameters.P; import javax.sip.*; import javax.sip.address.Address; import javax.sip.address.AddressFactory; import javax.sip.address.SipURI; -import javax.sip.header.*; +import javax.sip.header.ContentTypeHeader; +import javax.sip.header.ExpiresHeader; +import javax.sip.header.HeaderFactory; import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; @@ -157,7 +153,10 @@ public abstract class SIPRequestProcessorParent { responseAckExtraParam.content = sdp; responseAckExtraParam.sipURI = sipURI; - return responseAck(request, Response.OK, null, responseAckExtraParam); + SIPResponse sipResponse = responseAck(request, Response.OK, null, responseAckExtraParam); + + + return sipResponse; } /** @@ -190,7 +189,8 @@ public abstract class SIPRequestProcessorParent { reader.setEncoding(charset); // 对海康出现的未转义字符做处理。 String[] destStrArray = new String[]{"<",">","&","'","""}; - char despChar = '&'; // 或许可扩展兼容其他字符 + // 或许可扩展兼容其他字符 + char despChar = '&'; byte destBye = (byte) despChar; List result = new ArrayList<>(); byte[] rawContent = request.getRawContent(); @@ -220,4 +220,5 @@ public abstract class SIPRequestProcessorParent { return xml.getRootElement(); } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 10b0daa7..189976d1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -1,24 +1,18 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; -import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -29,15 +23,15 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; -import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; /** * SIP命令类型: ACK请求 @@ -46,7 +40,7 @@ import java.text.ParseException; @Component public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { - private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); + private final Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); private final String method = "ACK"; @Autowired @@ -73,32 +67,21 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IMediaServerService mediaServerService; - @Autowired - private ZlmHttpHookSubscribe subscribe; - @Autowired private DynamicTask dynamicTask; - @Autowired - private ISIPCommander cmder; - - @Autowired - private IDeviceService deviceService; - - @Autowired - private ISIPCommanderForPlatform commanderForPlatform; - - @Autowired - private AudioBroadcastManager audioBroadcastManager; - @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Autowired + private UserSetting userSetting; + + @Autowired + private IPlayService playService; + /** * 处理 ACK请求 - * - * @param evt */ @Override public void process(RequestEvent evt) { @@ -106,44 +89,45 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); logger.info("[收到ACK]: platformGbId->{}", platformGbId); - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); - // 取消设置的超时任务 - dynamicTask.stop(callIdHeader.getCallId()); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); - if (sendRtpItem == null) { - logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); - return; - } - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); - if (mediaInfo == null) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, callIdHeader); - }); - }else { - JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem); - if (startSendRtpStreamResult != null) { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, callIdHeader); + if (userSetting.getPushStreamAfterAck()) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); + // 取消设置的超时任务 + dynamicTask.stop(callIdHeader.getCallId()); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + if (sendRtpItem == null) { + logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); + return; + } + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + if (mediaInfo == null) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, callIdHeader); + }); + }else { + JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem); + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, callIdHeader); + } } } } - private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, - JSONObject jsonObject, CallIdHeader callIdHeader) { + JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { logger.info("调用ZLM推流接口, 结果: {}", jsonObject); - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getIp(), sendRtpItem.getPort()); + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); } else { - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(sendRtpItem)); + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); if (sendRtpItem.isOnlyAudio()) { Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); @@ -152,17 +136,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); } catch (SipException | ParseException | InvalidArgumentException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止语音喊话: {}", e.getMessage()); + logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); } } - }else { - // 向上级平台 - try { - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } } } + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 6e188a58..e3149620 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; @@ -439,18 +440,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("Ack 等待超时"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); - // 回复bye - try { - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - }, 60 * 1000); - responseSdpAck(request, content.toString(), platform); + if (userSetting.getPushStreamAfterAck()) { + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("Ack 等待超时"); + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); + // 回复bye + try { + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + }, 60 * 1000); + } + SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); + } } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -878,7 +884,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("f=\r\n"); try { - return responseSdpAck(request, content.toString(), platform); + SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); + } + return sipResponse; } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -905,11 +915,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + audioBroadcastCatch.getChannelId(); + dynamicTask.stop(key); try { responseAck(request, Response.TRYING); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 @@ -964,11 +977,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } return; } String addressStr = sdp.getOrigin().getAddress(); - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc, + mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP"); MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); if (mediaServerItem == null) { @@ -977,6 +993,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } return; } @@ -990,13 +1007,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } return; } - sendRtpItem.setTcp(mediaTransmissionTCP); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } + String app = "broadcast"; String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); @@ -1011,6 +1027,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setUsePs(false); sendRtpItem.setRtcp(false); sendRtpItem.setOnlyAudio(true); + sendRtpItem.setTcp(mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -1023,11 +1044,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.GONE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 语音通话 回复410失败, {}", e.getMessage()); + return; } playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } } catch (SdpException e) { logger.error("[SDP解析异常]", e); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } } else { logger.warn("来自无效设备/平台的请求"); @@ -1084,6 +1107,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); audioBroadcastManager.update(audioBroadcastCatch); + // 开启发流,大华在收到200OK后就会开始建立连接 + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader()); + } + } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) { logger.error("[命令发送失败] 语音喊话 回复200OK(SDP): {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index 2cf20726..1c0dd582 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -9,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; +import com.genersoft.iot.vmp.service.IPlayService; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.Element; import org.slf4j.Logger; @@ -35,11 +38,14 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i private ResponseMessageHandler responseMessageHandler; @Autowired - private DeferredResultHolder deferredResultHolder; + private DynamicTask dynamicTask; @Autowired private AudioBroadcastManager audioBroadcastManager; + @Autowired + private IPlayService playService; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -47,6 +53,8 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { + + SIPRequest request = (SIPRequest) evt.getRequest(); try { String channelId = getText(rootElement, "DeviceID"); if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { @@ -55,12 +63,23 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i return; } String result = getText(rootElement, "Result"); - logger.info("收到语音广播的回复 {}:{}/{}", result, device.getDeviceId(), channelId ); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); - audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); - audioBroadcastManager.update(audioBroadcastCatch); + logger.info("[语音广播]回复:{}, {}/{}", result, device.getDeviceId(), channelId ); + // 回复200 OK - responseAck((SIPRequest) evt.getRequest(), Response.OK); + responseAck(request, Response.OK); + if (result.equalsIgnoreCase("OK")) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); + audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); + audioBroadcastManager.update(audioBroadcastCatch); + // 等待invite消息, 超时则结束 + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + channelId; + dynamicTask.startDelay(key, ()->{ + logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId); + playService.stopAudioBroadcast(device.getDeviceId(), channelId); + }, 2000); + }else { + playService.stopAudioBroadcast(device.getDeviceId(), channelId); + } } catch (ParseException | SipException | InvalidArgumentException e) { logger.error("[命令发送失败] 国标级联 语音喊话: {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 761481be..2129ee12 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); + private final String cmdType = "Catalog"; @Autowired diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 51ff7adc..fd4f391e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -323,7 +323,7 @@ public class ZLMHttpHookListener { }); if ("rtsp".equals(param.getSchema())){ - logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); + logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); if (param.isRegist()) { mediaServerService.addCount(param.getMediaServerId()); }else { @@ -383,10 +383,10 @@ public class ZLMHttpHookListener { } }else { - logger.info("[语音喊话] 推流指向的·通道{}未找到", channelId); + logger.info("[语音对讲] 未找到通道:{}", channelId); } - }else { - logger.info("[语音喊话] 推流指向的·设备{}未找到", deviceId); + }else{ + logger.info("[语音对讲] 未找到设备:{}", deviceId); } }else { logger.info("[语音喊话] 推流格式有误, 格式为: broadcast/设备编号_通道编号 "); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 3eb61f54..fc7d90af 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -36,7 +36,7 @@ public class ZLMRESTfulUtils { // 设置连接超时时间 httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS); // 设置读取超时时间 - httpClientBuilder.readTimeout(5,TimeUnit.SECONDS); + httpClientBuilder.readTimeout(15,TimeUnit.SECONDS); // 设置连接池 httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES)); if (logger.isDebugEnabled()) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 3f7e13de..a24f4f46 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -3,9 +3,7 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.ServiceException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -15,11 +13,14 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import gov.nist.javax.sip.message.SIPResponse; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import java.text.ParseException; +import java.util.Map; /** * 点播处理 @@ -64,4 +65,9 @@ public interface IPlayService { void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; + + void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader); + + void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index d25e537e..eb208d32 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -3,12 +3,15 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -32,9 +35,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; /** @@ -89,6 +90,12 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private IMediaServerService mediaServerService; + @Autowired + private AudioBroadcastManager audioBroadcastManager; + + @Autowired + private ZLMRESTfulUtils zlmresTfulUtils; + @Override public void online(Device device) { logger.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); @@ -183,6 +190,23 @@ public class DeviceServiceImpl implements IDeviceService { // 移除订阅 removeCatalogSubscribe(device); removeMobilePositionSubscribe(device); + List audioBroadcastCatches = audioBroadcastManager.get(deviceId); + if (audioBroadcastCatches.size() > 0) { + for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); + if (sendRtpItem != null) { + redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStreamId()); + zlmresTfulUtils.stopSendRtp(mediaInfo, param); + } + + audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); + } + } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 5e62981a..8ce0ae86 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -24,16 +24,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; -import com.genersoft.iot.vmp.service.bean.PlayBackCallback; -import com.genersoft.iot.vmp.service.bean.PlayBackResult; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -42,6 +41,7 @@ import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -54,13 +54,11 @@ import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -119,11 +117,20 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ZlmHttpHookSubscribe subscribe; + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + @Override public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, @@ -1024,8 +1031,20 @@ public class PlayServiceImpl implements IPlayService { return false; } // 查询通道使用状态 - if (audioBroadcastInUse(device, channelId)) { - return false; + if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + // 查询流是否存在,不存在则认为是异常状态 + MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId()); + if (streamReady) { + logger.warn("语音广播已经开启: {}", channelId); + event.call("语音广播已经开启"); + return; + } else { + stopAudioBroadcast(device.getDeviceId(), channelId); + } + } } // 发送通知 @@ -1063,28 +1082,31 @@ public class PlayServiceImpl implements IPlayService { @Override public void stopAudioBroadcast(String deviceId, String channelId) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); - if (audioBroadcastCatch != null) { + List audioBroadcastCatchList = new ArrayList<>(); + if (channelId == null) { + audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId)); + }else { + audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId)); + } + if (audioBroadcastCatchList.size() > 0) { + for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) { + Device device = deviceService.getDevice(deviceId); + if (device == null || audioBroadcastCatch == null ) { + return; + } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); + if (sendRtpItem != null) { + redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStreamId()); + zlmresTfulUtils.stopSendRtp(mediaInfo, param); + } - Device device = deviceService.getDevice(deviceId); - if (device == null) { - return; + audioBroadcastManager.del(deviceId, channelId); } - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); - if (sendRtpItem != null) { - redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map param = new HashMap<>(); - param.put("vhost", "__defaultVhost__"); - param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); - zlmresTfulUtils.stopSendRtp(mediaInfo, param); - } - if (audioBroadcastCatch.isFromPlatform()) { - // TODO 向上级发送BYE结束语音喊话 - } - - audioBroadcastManager.del(deviceId, channelId); } } @@ -1187,4 +1209,100 @@ public class PlayServiceImpl implements IPlayService { Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); cmder.playResumeCmd(device, streamInfo); } + + @Override + public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) { + + // 开始发流 + // 取消设置的超时任务 +// String channelId = request.getCallIdHeader().getCallId(); + + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", is_Udp); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + + if (mediaInfo == null) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { + startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader); + }); + } else { + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader); + } + } + } + + @Override + public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("RTP推流失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("调用ZLM推流接口, 结果: {}", jsonObject); + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + } else { + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); + if (sendRtpItem.isOnlyAudio()) { + Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + try { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); + } catch (SipException | ParseException | InvalidArgumentException | + SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); + } + } + }else { + // 向上级平台 + try { + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 377b20f2..e8997cba 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; -import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -90,12 +89,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @Autowired private PlatformGbStreamMapper platformGbStreamMapper; - @Autowired - private IGbStreamService gbStreamService; - - @Autowired - private ParentPlatformMapper parentPlatformMapper; - /** * 根据设备ID判断设备是否存在 * diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index 92c2acb2..1b4d0af2 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -197,6 +197,8 @@ user-settings: sync-channel-on-device-online: false # 国标级联语音喊话发流模式 * UDP:udp传输 TCP-ACTIVE:tcp主动模式 TCP-PASSIVE:tcp被动模式 broadcast-for-platform: UDP + # 收到ack消息后开始发流,默认false, 回复200ok后直接开始发流 + push-stream-after-ack: false # 关闭在线文档(生产环境建议关闭) springdoc: