From 06bbe3fe01e5af9486c309693a975077df813f7c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 29 Sep 2022 16:27:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=AC=AC=E4=BA=8C=E7=A7=8D?= =?UTF-8?q?=E8=AF=AD=E9=9F=B3=E5=AF=B9=E8=AE=B2=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 - .../SsrcTransactionNotFoundException.java | 6 +- .../vmp/gb28181/bean/AudioBroadcastCatch.java | 24 +- .../vmp/gb28181/bean/InviteStreamType.java | 2 +- .../iot/vmp/gb28181/event/SipSubscribe.java | 3 +- .../gb28181/transmit/cmd/ISIPCommander.java | 32 +- .../cmd/SIPRequestHeaderProvider.java | 33 ++ .../transmit/cmd/impl/SIPCommander.java | 142 ++++--- .../cmd/impl/SIPCommanderFroPlatform.java | 2 +- .../request/SIPRequestProcessorParent.java | 14 +- .../request/impl/AckRequestProcessor.java | 40 +- .../request/impl/ByeRequestProcessor.java | 154 +++---- .../request/impl/InviteRequestProcessor.java | 191 +++------ .../impl/RegisterRequestProcessor.java | 22 +- .../impl/SubscribeRequestProcessor.java | 4 - .../impl/message/MessageHandlerAbstract.java | 18 +- .../cmd/BroadcastResponseMessageHandler.java | 5 +- .../iot/vmp/gb28181/utils/SipUtils.java | 3 + .../vmp/media/zlm/ZLMHttpHookListener.java | 153 ++++++- .../vmp/media/zlm/ZLMRTPServerFactory.java | 27 -- .../vmp/media/zlm/ZlmHttpHookSubscribe.java | 2 + .../media/zlm/dto/HookSubscribeFactory.java | 16 + .../zlm/dto/HookSubscribeForStreamPush.java | 42 ++ .../iot/vmp/service/IPlayService.java | 13 +- .../vmp/service/impl/MediaServiceImpl.java | 2 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 386 +++++++++++++----- .../vmanager/gb28181/play/PlayController.java | 61 +-- web_src/config/index.js | 4 +- .../src/components/dialog/devicePlayer.vue | 61 +-- 29 files changed, 882 insertions(+), 582 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamPush.java diff --git a/pom.xml b/pom.xml index 25bdb818..94c8277c 100644 --- a/pom.xml +++ b/pom.xml @@ -273,7 +273,6 @@ yyyyMMdd - org.apache.maven.plugins maven-surefire-plugin @@ -282,7 +281,6 @@ true - diff --git a/src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java b/src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java index 4038cd11..05015184 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java @@ -39,12 +39,12 @@ public class SsrcTransactionNotFoundException extends Exception{ @Override public String getMessage() { StringBuffer msg = new StringBuffer(); - msg.append(StringFormatter.format("缓存事务信息未找到,device:%s channel: %s ", deviceId, channelId)); + msg.append(String.format("缓存事务信息未找到,device:%s channel: %s ", deviceId, channelId)); if (callId != null) { - msg.append("callId: " + callId); + msg.append(",callId: " + callId); } if (stream != null) { - msg.append("stream: " + stream); + msg.append(",stream: " + stream); } return msg.toString(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java index 93a0d162..e2adddce 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean; import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.stack.SIPDialog; import javax.sip.Dialog; @@ -40,12 +41,7 @@ public class AudioBroadcastCatch { /** * 请求信息 */ - private SIPRequest request; - - /** - * 会话信息 - */ - private SIPDialog dialog; + private SipTransactionInfo sipTransactionInfo; public String getDeviceId() { @@ -72,19 +68,15 @@ public class AudioBroadcastCatch { this.status = status; } - public void setDialog(SIPDialog dialog) { - this.dialog = dialog; + public SipTransactionInfo getSipTransactionInfo() { + return sipTransactionInfo; } - public SIPDialog getDialog() { - return dialog; + public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) { + this.sipTransactionInfo = sipTransactionInfo; } - public SIPRequest getRequest() { - return request; - } - - public void setRequest(SIPRequest request) { - this.request = request; + public void setSipTransactionInfoByRequset(SIPResponse response) { + this.sipTransactionInfo = new SipTransactionInfo(response); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java index 24d509fd..a3098fb7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java @@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean; public enum InviteStreamType { - PLAY,PLAYBACK,PUSH,PROXY,CLOUD_RECORD_PUSH,CLOUD_RECORD_PROXY + PLAY,PLAYBACK,PUSH,PROXY,CLOUD_RECORD_PUSH,CLOUD_RECORD_PROXY,TALK } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index e5b995e4..a4ec52a3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.event; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; @@ -57,7 +58,7 @@ public class SipSubscribe { logger.debug("errorSubscribes.size:{}",errorSubscribes.size()); } - public interface Event { void response(EventResult eventResult) ; + public interface Event { void response(EventResult eventResult); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 860560da..f95dc134 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -8,18 +8,12 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.stack.SIPDialog; -import javax.sip.Dialog; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; import javax.sip.InvalidArgumentException; import javax.sip.PeerUnavailableException; import javax.sip.SipException; -import javax.sip.message.Request; import java.text.ParseException; +import javax.sip.message.Request; /** * @description:设备能力接口,用于定义设备的控制、查询能力 @@ -130,14 +124,18 @@ public interface ISIPCommander { String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + /** * 视频流停止 */ - void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent); - void streamByeCmd(String deviceId, String channelId, String stream, String callId); void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; + + void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException; + void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; + /** * 回放暂停 */ @@ -168,22 +166,12 @@ public interface ISIPCommander { /** - - /** - * 语音广播 - * - * @param device 视频设备 - */ - boolean audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent); - void audioBroadcastCmd(Device device,String channelId); - - /** + * /** * 语音广播 * - * @param device 视频设备 + * @param device 视频设备 */ - void audioBroadcastCmd(Device device, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; - void audioBroadcastCmd(Device device) throws InvalidArgumentException, SipException, ParseException; + void audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 音视频录像控制 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index 1374ddf5..1f4f73c6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -326,4 +326,37 @@ public class SIPRequestHeaderProvider { return request; } + + public Request createBroadcastMessageRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { + Request request = null; + // sipuri + SipURI requestURI = sipFactory.createAddressFactory().createSipURI(channelId, device.getHostAddress()); + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(sipConfig.getIp(), sipConfig.getPort(), device.getTransport(), viaTag); + viaHeader.setRPort(); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getDomain()); + Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(channelId, device.getHostAddress()); + Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag); + + // Forwards + MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); + // ceq + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.MESSAGE); + + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); + + request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, + toHeader, viaHeaders, maxForwards, contentTypeHeader, content); + + request.addHeader(SipUtils.createUserAgentHeader(sipFactory, gitUtil)); + + return request; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 8d70f2ff..5d3caf79 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; @@ -12,45 +11,32 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamPush; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.GitUtil; -import gov.nist.javax.sip.SIPConstants; import gov.nist.javax.sip.SipProviderImpl; -import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import gov.nist.javax.sip.stack.SIPClientTransaction; -import gov.nist.javax.sip.stack.SIPClientTransactionImpl; -import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.DependsOn; -import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.*; -import javax.sip.address.Address; -import javax.sip.address.SipURI; import javax.sip.header.*; import javax.sip.message.Request; -import javax.sip.message.Response; -import java.lang.reflect.Field; import java.text.ParseException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; /** * @description:设备能力接口,用于定义设备的控制、查询能力 @@ -98,6 +84,9 @@ public class SIPCommander implements ISIPCommander { @Autowired private IMediaServerService mediaServerService; + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + /** * 云台方向放控制,使用配置文件中的默认镜头移动速度 @@ -591,11 +580,73 @@ public class SIPCommander implements ISIPCommander { }); } + @Override + public void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + + String stream = ssrcInfo.getStream(); + + if (device == null) { + return; + } + if (!mediaServerItem.isRtpEnable()) { + // 单端口暂不支持语音对讲 + logger.info("[语音对讲] 单端口暂不支持此操作"); + return; + } + + logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); + subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + if (event != null) { + event.response(mediaServerItemInUse, json); + subscribe.removeSubscribe(hookSubscribeForStreamChange); + } + }); + + CallIdHeader callIdHeader = device.getTransport().equalsIgnoreCase("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + callIdHeader.setCallId(callId); + HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream, null, mediaServerItem.getId()); + subscribe.addSubscribe(hookSubscribeForStreamPush, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + if (eventForPush != null) { + eventForPush.response(mediaServerItemInUse, json); + } + }); + // + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("s=Talk\r\n"); + content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("t=0 0\r\n"); + + content.append("m=audio " + ssrcInfo.getPort() + " RTP/AVP 8\r\n"); + content.append("a=sendrecv\r\n"); + content.append("a=rtpmap:8 PCMA/8000\r\n"); + + content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc + // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 + content.append("f=v/////a/1/8/1" + "\r\n"); + + Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(), callIdHeader); + transmitRequest(device.getTransport(), request, (e -> { + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + errorEvent.response(e); + }), e -> { + // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 + ResponseEvent responseEvent = (ResponseEvent) e.event; + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + streamSession.put(device.getDeviceId(), channelId, "talk", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); + okEvent.response(e); + }); + } + /** * 视频流停止, 不使用回调 */ @Override - public void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException { + public synchronized void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException { streamByeCmd(device, channelId, stream, callId, null); } @@ -603,7 +654,7 @@ public class SIPCommander implements ISIPCommander { * 视频流停止 */ @Override - public void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { + public synchronized void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callId, stream); if (ssrcTransaction == null) { throw new SsrcTransactionNotFoundException(device.getDeviceId(), channelId, callId, stream); @@ -617,67 +668,34 @@ public class SIPCommander implements ISIPCommander { transmitRequest(device.getTransport(), byteRequest, null, okEvent); } - /** + @Override + public synchronized void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { + Request byteRequest = headerProvider.createByteRequest(device, channelId, sipTransactionInfo); + transmitRequest(device.getTransport(), byteRequest, null, okEvent); + } + + /** * 语音广播 * * @param device 视频设备 */ - @Override - public void audioBroadcastCmd(Device device) throws InvalidArgumentException, SipException, ParseException { - - StringBuffer broadcastXml = new StringBuffer(200); - String charset = device.getCharset(); - broadcastXml.append("\r\n"); - broadcastXml.append("\r\n"); - broadcastXml.append("Broadcast\r\n"); - broadcastXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); - broadcastXml.append("" + sipConfig.getId() + "\r\n"); - broadcastXml.append("" + device.getDeviceId() + "\r\n"); - broadcastXml.append("\r\n"); - /** - * 语音广播 - * - * @param device 视频设备 - */ @Override - public boolean audioBroadcastCmd(Device device,String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { - try { - StringBuffer broadcastXml = new StringBuffer(200); - String charset = device.getCharset(); - broadcastXml.append("\r\n"); - broadcastXml.append("\r\n"); - broadcastXml.append("Broadcast\r\n"); - broadcastXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); - broadcastXml.append("" + sipConfig.getId() + "\r\n"); - broadcastXml.append("" + channelId + "\r\n"); - broadcastXml.append("\r\n"); - - CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() - : udpSipProvider.getNewCallId(); - - Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, callIdHeader); - transmitRequest(device.getTransport(), request); - - } - - @Override - public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { - + public void audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { StringBuffer broadcastXml = new StringBuffer(200); String charset = device.getCharset(); broadcastXml.append("\r\n"); broadcastXml.append("\r\n"); broadcastXml.append("Broadcast\r\n"); - broadcastXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); + broadcastXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); broadcastXml.append("" + sipConfig.getId() + "\r\n"); - broadcastXml.append("" + device.getDeviceId() + "\r\n"); + broadcastXml.append("" + channelId + "\r\n"); broadcastXml.append("\r\n"); CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, callIdHeader); - transmitRequest(device.getTransport(), request, errorEvent); + transmitRequest(device.getTransport(), request, errorEvent, okEvent); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 7712350a..0ab542bb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -676,7 +676,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public void streamByeCmd(ParentPlatform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException { + public synchronized void streamByeCmd(ParentPlatform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException { if (sendRtpItem == null ) { logger.info("[向上级发送BYE], sendRtpItem 为NULL"); return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index c005f2c0..c61e2c10 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -1,12 +1,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import gov.nist.javax.sip.stack.SIPServerTransaction; import gov.nist.javax.sip.stack.SIPServerTransactionImpl; import org.apache.commons.lang3.ArrayUtils; import org.dom4j.Document; @@ -27,8 +27,6 @@ import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; import java.io.ByteArrayInputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; @@ -51,6 +49,9 @@ public abstract class SIPRequestProcessorParent { @Qualifier(value="udpSipProvider") private SipProviderImpl udpSipProvider; + @Autowired + private SipConfig sipConfig; + /** * 根据 RequestEvent 获取 ServerTransaction * @param evt @@ -60,13 +61,15 @@ public abstract class SIPRequestProcessorParent { Request request = evt.getRequest(); SIPServerTransactionImpl serverTransaction = (SIPServerTransactionImpl)evt.getServerTransaction(); // 判断TCP还是UDP + boolean isTcp = false; ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); String transport = reqViaHeader.getTransport(); + if (transport.equalsIgnoreCase("TCP")) { + isTcp = true; + } if (serverTransaction != null && serverTransaction.getOriginalRequest() == null) { serverTransaction.setOriginalRequest((SIPRequest) evt.getRequest()); } - boolean isTcp = "TCP".equals(transport); - if (serverTransaction == null) { try { if (isTcp) { @@ -187,7 +190,6 @@ public abstract class SIPRequestProcessorParent { * 回复带sdp的200 */ public SIPResponse responseSdpAck(ServerTransaction serverTransaction, String sdp, ParentPlatform platform) throws SipException, InvalidArgumentException, ParseException { - ContentTypeHeader contentTypeHeader = SipFactory.getInstance().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); // 兼容国标中的使用编码@域名作为RequestURI的情况 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 9bc10a6e..3e7fbeb2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; @@ -14,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; @@ -21,7 +23,6 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPDialog; -import com.genersoft.iot.vmp.utils.SerializeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -81,6 +82,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private ISIPCommander cmder; + @Autowired + private IDeviceService deviceService; + @Autowired private ISIPCommanderForPlatform commanderForPlatform; @@ -106,7 +110,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In // 取消设置的超时任务 dynamicTask.stop(callIdHeader.getCallId()); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); if (sendRtpItem == null) { logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); return; @@ -123,7 +127,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In param.put("pt", sendRtpItem.getPt()); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - if (!sendRtpItem.isTcp() && parentPlatform.isRtcp()) { + if (!sendRtpItem.isTcp() && parentPlatform != null && parentPlatform.isRtcp()) { // 开启rtcp保活 param.put("udp_rtcp_timeout", "1"); } @@ -141,29 +145,28 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { - - if (sendRtpItem.isOnlyAudio()) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); - audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog()); - audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest()); - audioBroadcastManager.update(audioBroadcastCatch); - String waiteStreamTimeoutTaskKey = "waite-stream-" + audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId(); - dynamicTask.stop(waiteStreamTimeoutTaskKey); - } logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); } else { logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSON(param)); if (sendRtpItem.isOnlyAudio()) { // 语音对讲 - try { - cmder.streamByeCmd((SIPDialog) evt.getDialog(), sendRtpItem.getChannelId(), (SIPRequest) evt.getRequest(), null); - } catch (SipException | ParseException | InvalidArgumentException e) { - logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); + Device device = deviceService.queryDevice(platformGbId); + if (device != null) { + try { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), sendRtpItem.getStreamId(), null); + } catch (SipException | ParseException | InvalidArgumentException | + SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); + } } + } else { // 向上级平台 - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); + try { + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联, 回复BYE: {}", e.getMessage()); + } } if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( @@ -179,7 +182,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In } - } } } private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index a20030f1..1192b074 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -2,10 +2,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -54,6 +52,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IDeviceService deviceService; + @Autowired + private AudioBroadcastManager audioBroadcastManager; + @Autowired private IVideoManagerStorage storager; @@ -91,78 +92,79 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In logger.error("[回复BYE信息失败],{}", e.getMessage()); } CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); - String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); - logger.info("[收到bye] {}/{}", platformGbId, channelId); - if (sendRtpItem != null){ - String streamId = sendRtpItem.getStreamId(); - Map param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",streamId); - param.put("ssrc",sendRtpItem.getSsrc()); - logger.info("[收到bye] 停止向上级推流:{}", streamId); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); - zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); - int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); - if (totalReaderCount <= 0) { - logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); - if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { - Device device = deviceService.queryDevice(sendRtpItem.getDeviceId()); - if (device == null) { - logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); - } - try { - logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, streamId, null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); - } - } - if (sendRtpItem.isOnlyAudio()) { - playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - } - if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - } - } - } - // 可能是设备主动停止 - Device device = storager.queryVideoDeviceByChannelId(platformGbId); - if (device != null) { - storager.stopPlay(device.getDeviceId(), channelId); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); - mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); - } - SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); - if (ssrcTransactionForPlay != null){ - if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){ - // 释放ssrc - MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId()); - if (mediaServerItem != null) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc()); - } - streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); - } - } - SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null); - if (ssrcTransactionForPlayBack != null) { - // 释放ssrc - MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId()); - if (mediaServerItem != null) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc()); - } - streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream()); - } - } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + if (sendRtpItem != null){ + logger.info("[收到bye] {}/{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId()); + String streamId = sendRtpItem.getStreamId(); + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",streamId); + param.put("ssrc",sendRtpItem.getSsrc()); + logger.info("[收到bye] 停止向上级推流:{}", streamId); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); + zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); + if (totalReaderCount <= 0) { + logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); + if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { + Device device = deviceService.queryDevice(sendRtpItem.getDeviceId()); + if (device == null) { + logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); + } + try { + logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); + } + } + + if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + } + } + playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } + + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + + // 可能是设备主动停止 + Device device = storager.queryVideoDeviceByChannelId(platformGbId); + if (device != null) { + storager.stopPlay(device.getDeviceId(), channelId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); + mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); + } + SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); + if (ssrcTransactionForPlay != null){ + if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){ + // 释放ssrc + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId()); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc()); + } + streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); + } + } + SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null); + if (ssrcTransactionForPlayBack != null) { + // 释放ssrc + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId()); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc()); + } + streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream()); + } + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 49a44523..78adfd16 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,11 +1,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; @@ -14,7 +13,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; @@ -30,20 +30,16 @@ import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.utils.SerializeUtils; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.stack.SIPDialog; -import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +68,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private final String method = "INVITE"; @Autowired - private SIPCommanderFroPlatform cmderFroPlatform; + private ISIPCommanderForPlatform cmderFroPlatform; @Autowired private IVideoManagerStorage storager; @@ -174,7 +170,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform == null) { - inviteFromDeviceHandle(serverTransaction, requesterId); + inviteFromDeviceHandle(serverTransaction, requesterId, channelId); } else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); @@ -393,14 +389,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }; SipSubscribe.Event errorEvent = ((event) -> { // 未知错误。直接转发设备点播的错误 - Response response = null; try { - response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); + Response response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); serverTransaction.sendResponse(response); System.out.println("未知错误。直接转发设备点播的错误"); if (serverTransaction.getDialog() != null) { serverTransaction.getDialog().delete(); } + serverTransaction.getDialog().delete(); + } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } @@ -817,7 +814,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException { - // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId); @@ -918,125 +914,64 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setOnlyAudio(true); redisCatchStorage.updateSendRTPSever(sendRtpItem); - // hook监听等待设备推流上来 - // 添加订阅 - HookSubscribeForStreamChange subscribeKey = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); - - String finalSsrc = ssrc; - // 流已经存在时直接推流 - // 设置等待推流的超时; 默认20s - String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId(); - dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ - logger.info("等待推流超时: {}/{}", app, stream); - subscribe.removeSubscribe(subscribeKey); - playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); - // 发送bye - try { - responseAck(evt, Response.BUSY_HERE); - } catch (SipException e) { - throw new RuntimeException(e); - } catch (InvalidArgumentException e) { - throw new RuntimeException(e); - } catch (ParseException e) { - throw new RuntimeException(e); - } - }, 20*1000); - - boolean finalMediaTransmissionTCP = mediaTransmissionTCP; - subscribe.addSubscribe(subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - logger.info("收到语音对讲推流"); - dynamicTask.stop(waiteStreamTimeoutTaskKey); - MediaItem mediaItem = JSON.toJavaObject(json, MediaItem.class); - Integer audioCodecId = null; - if (mediaItem.getTracks() != null && mediaItem.getTracks().size() > 0) { - for (int i = 0; i < mediaItem.getTracks().size(); i++) { - MediaItem.MediaTrack mediaTrack = mediaItem.getTracks().get(i); - if (mediaTrack.getCodecType() == 1) { - audioCodecId = mediaTrack.getCodecId(); - break; - } - } - } - - try { - sendRtpItem.setStatus(2); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); - content.append("s=Play\r\n"); - content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); - content.append("t=0 0\r\n"); - if (audioCodecId == null) { - if (finalMediaTransmissionTCP) { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); - }else { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); - } - - content.append("a=rtpmap:8 PCMA/8000\r\n"); - }else { - if (audioCodecId == 4) { - if (finalMediaTransmissionTCP) { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n"); - }else { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n"); - } - content.append("a=rtpmap:0 PCMU/8000\r\n"); - }else { - if (finalMediaTransmissionTCP) { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); - }else { - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); - } - content.append("a=rtpmap:8 PCMA/8000\r\n"); - } - } - content.append("a=sendonly\r\n"); - if (sendRtpItem.isTcp()) { - content.append("a=connection:new\r\n"); - if (!sendRtpItem.isTcpActive()) { - content.append("a=setup:active\r\n"); - }else { - content.append("a=setup:passive\r\n"); - } - } - content.append("y="+ finalSsrc + "\r\n"); - content.append("f=v/////a/1/8/1\r\n"); - - ParentPlatform parentPlatform = new ParentPlatform(); - parentPlatform.setServerIP(device.getIp()); - parentPlatform.setServerPort(device.getPort()); - parentPlatform.setServerGBId(device.getDeviceId()); - - responseSdpAck(serverTransaction, content.toString(), parentPlatform); - Dialog dialog = evt.getDialog(); - audioBroadcastCatch.setDialog((SIPDialog) dialog); - audioBroadcastCatch.setRequest((SIPRequest) request); - audioBroadcastManager.update(audioBroadcastCatch); - } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) { - logger.error("[命令发送失败] 语音对讲: {}", e.getMessage()); - } - }); -// } - String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(0); - wvpResult.setMsg("success"); - AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); - audioBroadcastResult.setApp(app); - audioBroadcastResult.setStream(stream); - audioBroadcastResult.setStreamInfo(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false)); - audioBroadcastResult.setCodec("G.711"); - wvpResult.setData(audioBroadcastResult); - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setKey(key); - requestMessage.setData(wvpResult); - resultHolder.invokeAllResult(requestMessage); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady) { + sendOk(device, sendRtpItem, sdp, serverTransaction, mediaServerItem, mediaTransmissionTCP, ssrc); + }else { + logger.warn("[语音通话], 未发现待推送的流,app={},stream={}", app, stream); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + } } else { logger.warn("来自无效设备/平台的请求"); responseAck(serverTransaction, Response.BAD_REQUEST); } } + + void sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, ServerTransaction serverTransaction, MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc){ + try { + sendRtpItem.setStatus(2); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("t=0 0\r\n"); + + if (mediaTransmissionTCP) { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n"); + }else { + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + } + + content.append("a=rtpmap:8 PCMA/8000/1\r\n"); + + content.append("a=sendonly\r\n"); + if (sendRtpItem.isTcp()) { + content.append("a=connection:new\r\n"); + if (!sendRtpItem.isTcpActive()) { + content.append("a=setup:active\r\n"); + }else { + content.append("a=setup:passive\r\n"); + } + } + content.append("y="+ ssrc + "\r\n"); + content.append("f=v/////a/1/8/1\r\n"); + + ParentPlatform parentPlatform = new ParentPlatform(); + parentPlatform.setServerIP(device.getIp()); + parentPlatform.setServerPort(device.getPort()); + parentPlatform.setServerGBId(device.getDeviceId()); + + SIPResponse sipResponse = responseSdpAck(serverTransaction, content.toString(), parentPlatform); + + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), sendRtpItem.getChannelId()); + + audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); + audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); + audioBroadcastManager.update(audioBroadcastCatch); + } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) { + logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage()); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 8bab8f53..e23177ad 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -20,12 +20,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; -import javax.sip.InvalidArgumentException; -import javax.sip.RequestEvent; -import javax.sip.ServerTransaction; -import javax.sip.SipException; +import javax.sip.*; import javax.sip.header.*; import javax.sip.message.Request; import javax.sip.message.Response; @@ -116,10 +112,12 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen if (expiresHeader == null) { response = getMessageFactory().createResponse(Response.BAD_REQUEST, request); - ServerTransaction serverTransaction = getServerTransaction(evt); - serverTransaction.sendResponse(response); - if (serverTransaction.getDialog() != null) { - serverTransaction.getDialog().delete(); + if (evt.getDialog() != null ) { + if (evt.getDialog().isServer()) { + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + serverTransaction.getDialog().delete(); + } } return; } @@ -176,19 +174,13 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen } catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) { e.printStackTrace(); } - } private void sendResponse(RequestEvent evt, Response response) throws InvalidArgumentException, SipException { ServerTransaction serverTransaction = getServerTransaction(evt); - if (serverTransaction == null) { - logger.warn("[回复失败]:{}", response); - return; - } serverTransaction.sendResponse(response); if (serverTransaction.getDialog() != null) { serverTransaction.getDialog().delete(); } } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index 2ec40238..56dce36d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -1,20 +1,16 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.CmdType; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; -import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.SIPRequest; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java index cf7d1122..18c6824c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java @@ -3,11 +3,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.AckRequestProcessor; import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; +import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; @@ -18,6 +22,8 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent implements IMessageHandler{ + private Logger logger = LoggerFactory.getLogger(MessageHandlerAbstract.class); + public Map messageHandlerMap = new ConcurrentHashMap<>(); public void addHandler(String cmdType, IMessageHandler messageHandler) { @@ -48,14 +54,10 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i public void handNullCmd(RequestEvent evt){ try { - responseAck(evt, Response.OK); - } catch (SipException e) { - throw new RuntimeException(e); - } catch (InvalidArgumentException e) { - throw new RuntimeException(e); - } catch (ParseException e) { - throw new RuntimeException(e); + ServerTransaction serverTransaction = getServerTransaction(evt); + responseAck(serverTransaction, Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 回复200 OK: {}", e.getMessage()); } - return; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index e86e3fd8..5470374d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -52,16 +52,17 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i public void handForDevice(RequestEvent evt, Device device, Element rootElement) { try { String channelId = getText(rootElement, "DeviceID"); + ServerTransaction serverTransaction = getServerTransaction(evt); if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { // 回复410 - responseAck(evt, Response.GONE); + responseAck(serverTransaction, Response.GONE); return; } logger.info("收到语音广播的回复:{}/{}", device.getDeviceId(), channelId ); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); audioBroadcastManager.update(audioBroadcastCatch); - responseAck(evt, Response.OK); + responseAck(serverTransaction, Response.OK); } catch (ParseException | SipException | InvalidArgumentException e) { logger.error("[命令发送失败] 国标级联 语音喊话: {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index bf2a0eef..d97450fd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -113,4 +113,7 @@ public class SipUtils { return builder.toString(); } + public static String getNewCallId() { + return (int) Math.floor(Math.random() * 10000) + ""; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index b043cadf..5194a9aa 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -11,12 +11,16 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -51,7 +55,13 @@ public class ZLMHttpHookListener { private SIPCommander cmder; @Autowired - private SIPCommanderFroPlatform commanderFroPlatform; + private ISIPCommanderForPlatform commanderFroPlatform; + + @Autowired + private AudioBroadcastManager audioBroadcastManager; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; @Autowired private IPlayService playService; @@ -466,7 +476,127 @@ public class ZLMHttpHookListener { streamInfo.getStream(), null); } } - }else { + }else if ("broadcast".equals(app)){ + // 语音对讲推流 stream需要满足格式deviceId_channelId + if (regist && stream.indexOf("_") > 0) { + String[] streamArray = stream.split("_"); + if (streamArray.length == 2) { + String deviceId = streamArray[0]; + String channelId = streamArray[1]; + Device device = deviceService.queryDevice(deviceId); + if (device != null) { + DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); + if (deviceChannel != null) { + if (audioBroadcastManager.exit(deviceId, channelId)) { + // 直接推流 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null); + if (sendRtpItem == null) { + // TODO 可能数据错误,重新开启语音通道 + }else { + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + + JSONObject jsonObject; + if (sendRtpItem.isTcpActive()) { + jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + } else { + param.put("is_udp", is_Udp); + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + if (jsonObject != null && jsonObject.getInteger("code") == 0) { + logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId); + } + + } + }else { + // 开启语音对讲通道 + try { + playService.audioBroadcastCmd(device, channelId, 60, (msg)->{ + logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); + }); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[命令发送失败] 语音对讲: {}", e.getMessage()); + } + } + + } + } + } + } + + }else if ("talk".equals(app)){ + // 语音对讲推流 stream需要满足格式deviceId_channelId + if (regist && stream.indexOf("_") > 0) { + String[] streamArray = stream.split("_"); + if (streamArray.length == 2) { + String deviceId = streamArray[0]; + String channelId = streamArray[1]; + Device device = deviceService.queryDevice(deviceId); + if (device != null) { + DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); + if (deviceChannel != null) { + if (audioBroadcastManager.exit(deviceId, channelId)) { + // 直接推流 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null); + if (sendRtpItem == null) { + // TODO 可能数据错误,重新开启语音通道 + }else { + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + + JSONObject jsonObject; + if (sendRtpItem.isTcpActive()) { + jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + } else { + param.put("is_udp", is_Udp); + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + if (jsonObject != null && jsonObject.getInteger("code") == 0) { + logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId); + } + } + }else { + // 开启语音对讲通道 + MediaServerItem mediaServerForMinimumLoad = mediaServerService.getMediaServerForMinimumLoad(); + playService.talk(mediaServerForMinimumLoad, device, channelId, (mediaServerItem, jsonObject)->{ + System.out.println("开始推流"); + }, eventResult -> { + System.out.println(eventResult.msg); + }, ()->{ + System.out.println("超时"); + }); + } + + } + } + } + } + + }else{ if (!"rtp".equals(app)){ String type = OriginType.values()[item.getOriginType()].getType(); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); @@ -521,10 +651,23 @@ public class ZLMHttpHookListener { if (sendRtpItem.getApp().equals(app)) { String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.queryDevice(platformId); try { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - } catch (SipException | InvalidArgumentException | ParseException e) { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + }else { + if (sendRtpItem.isOnlyAudio()) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (device != null && audioBroadcastCatch != null) { +// cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); + } + }else { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), stream, sendRtpItem.getCallId()); + } + + } + } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 80ae95ed..2bd9a952 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -164,33 +164,6 @@ public class ZLMRTPServerFactory { return result; } -// private int getPortFromportRange(MediaServerItem mediaServerItem) { -// int currentPort = mediaServerItem.getCurrentPort(); -// if (currentPort == 0) { -// String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(","); -// if (portRangeStrArray.length != 2) { -// portRangeArray[0] = 30000; -// portRangeArray[1] = 30500; -// }else { -// portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]); -// portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]); -// } -// } -// -// if (currentPort == 0 || currentPort++ > portRangeArray[1]) { -// currentPort = portRangeArray[0]; -// mediaServerItem.setCurrentPort(currentPort); -// return portRangeArray[0]; -// } else { -// if (currentPort % 2 == 1) { -// currentPort++; -// } -// currentPort++; -// mediaServerItem.setCurrentPort(currentPort); -// return currentPort; -// } -// } - /** * 创建一个国标推流 * @param ip 推流ip diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java index 19ffdc49..5cc2a665 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java @@ -38,6 +38,7 @@ public class ZlmHttpHookSubscribe { hookSubscribe.setExpires(expiresInstant); } allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event); + System.out.println(allSubscribes); } public ZlmHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) { @@ -48,6 +49,7 @@ public class ZlmHttpHookSubscribe { } for (IHookSubscribe key : eventMap.keySet()) { Boolean result = null; + for (String s : key.getContent().keySet()) { if (result == null) { result = key.getContent().getString(s).equals(hookResponse.getString(s)); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java index 92172f3a..b8449181 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java @@ -24,10 +24,26 @@ public class HookSubscribeFactory { return hookSubscribe; } + public static HookSubscribeForStreamPush on_publish(String app, String stream, String scheam, String mediaServerId) { + HookSubscribeForStreamPush hookSubscribe = new HookSubscribeForStreamPush(); + JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject(); + subscribeKey.put("app", app); + subscribeKey.put("stream", stream); + if (scheam != null) { + subscribeKey.put("schema", scheam); + } + subscribeKey.put("mediaServerId", mediaServerId); + hookSubscribe.setContent(subscribeKey); + + return hookSubscribe; + } + + public static HookSubscribeForServerStarted on_server_started() { HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted(); hookSubscribe.setContent(new JSONObject()); return hookSubscribe; } + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamPush.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamPush.java new file mode 100644 index 00000000..48e2e3c0 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamPush.java @@ -0,0 +1,42 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +import com.alibaba.fastjson.JSONObject; + +import java.time.Instant; + +/** + * hook订阅-开始推流 + * @author lin + */ +public class HookSubscribeForStreamPush implements IHookSubscribe{ + + private HookType hookType = HookType.on_publish; + + private JSONObject content; + + private Instant expires; + + @Override + public HookType getHookType() { + return hookType; + } + + @Override + public JSONObject getContent() { + return content; + } + + public void setContent(JSONObject content) { + this.content = content; + } + + @Override + public Instant getExpires() { + return expires; + } + + @Override + public void setExpires(Instant expires) { + this.expires = expires; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 9a2bb58e..1eac2c81 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -11,11 +11,16 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.springframework.web.context.request.async.DeferredResult; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; + /** * 点播处理 */ @@ -23,6 +28,10 @@ public interface IPlayService { void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); + void talk(MediaServerItem mediaServerItem, Device device, String channelId, + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback); + void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback, String uuid); @@ -44,6 +53,8 @@ public interface IPlayService { void zlmServerOnline(String mediaServerId); - void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event); + AudioBroadcastResult audioBroadcast(Device device, String channelId); void stopAudioBroadcast(String deviceId, String channelId); + + void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 734a6740..4785afe5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -69,7 +69,7 @@ public class MediaServiceImpl implements IMediaService { JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class); JSONArray tracks = mediaJSON.getJSONArray("tracks"); if (authority) { - streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr, calld); + streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr, calld, true); }else { streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null, true); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index d6c3d5f1..68055a4d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -12,8 +12,10 @@ import javax.sip.SipException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IDeviceService; +import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,6 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; @@ -54,33 +55,10 @@ import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; -import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; -import gov.nist.javax.sip.stack.SIPDialog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Service; -import org.springframework.util.ResourceUtils; -import org.springframework.web.context.request.async.DeferredResult; - -import javax.sip.ResponseEvent; -import javax.sip.SipException; -import java.io.FileNotFoundException; -import java.math.BigDecimal; -import java.text.ParseException; -import java.math.RoundingMode; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.Stream; - @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service public class PlayServiceImpl implements IPlayService { @@ -97,7 +75,10 @@ public class PlayServiceImpl implements IPlayService { private AudioBroadcastManager audioBroadcastManager; @Autowired - private SIPCommanderFroPlatform sipCommanderFroPlatform; + private IDeviceService deviceService; + + @Autowired + private ISIPCommanderForPlatform sipCommanderFroPlatform; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -123,10 +104,6 @@ public class PlayServiceImpl implements IPlayService { @Autowired private VideoStreamSessionManager streamSession; - - @Autowired - private IDeviceService deviceService; - @Autowired private UserSetting userSetting; @@ -145,7 +122,6 @@ public class PlayServiceImpl implements IPlayService { private ThreadPoolTaskExecutor taskExecutor; - @Override public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, @@ -169,15 +145,15 @@ public class PlayServiceImpl implements IPlayService { StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); - result.onCompletion(()->{ + result.onCompletion(() -> { // 点播结束时调用截图接口 - taskExecutor.execute(()->{ + taskExecutor.execute(() -> { // TODO 应该在上流时调用更好,结束也可能是错误结束 - String path = "snap"; - String fileName = deviceId + "_" + channelId + ".jpg"; - WVPResult wvpResult = (WVPResult)result.getResult(); + String path = "snap"; + String fileName = deviceId + "_" + channelId + ".jpg"; + WVPResult wvpResult = (WVPResult) result.getResult(); if (Objects.requireNonNull(wvpResult).getCode() == 0) { - StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData(); + StreamInfo streamInfoForSuccess = (StreamInfo) wvpResult.getData(); MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); String streamUrl = streamInfoForSuccess.getFmp4(); // 请求截图 @@ -201,7 +177,7 @@ public class PlayServiceImpl implements IPlayService { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId); - if(rtpInfo.getInteger("code") == 0){ + if (rtpInfo.getInteger("code") == 0) { if (rtpInfo.getBoolean("exist")) { int localPort = rtpInfo.getInteger("local_port"); if (localPort == 0) { @@ -214,7 +190,7 @@ public class PlayServiceImpl implements IPlayService { resultHolder.invokeAllResult(msg); return playResult; - }else { + } else { WVPResult wvpResult = new WVPResult(); wvpResult.setCode(ErrorCode.SUCCESS.getCode()); wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); @@ -227,12 +203,12 @@ public class PlayServiceImpl implements IPlayService { } } - }else { + } else { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); streamInfo = null; } - }else { + } else { //zlm连接失败 redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); @@ -246,7 +222,7 @@ public class PlayServiceImpl implements IPlayService { } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); - play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{ + play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> { if (hookEvent != null) { hookEvent.response(mediaServerItem, response); } @@ -260,13 +236,13 @@ public class PlayServiceImpl implements IPlayService { if (errorEvent != null) { errorEvent.response(event); } - }, (code, msgStr)->{ + }, (code, msgStr) -> { // invite点播超时 WVPResult wvpResult = new WVPResult(); wvpResult.setCode(ErrorCode.ERROR100.getCode()); if (code == 0) { wvpResult.setMsg("点播超时,请稍候重试"); - }else if (code == 1) { + } else if (code == 1) { wvpResult.setMsg("收流超时,请稍候重试"); } msg.setData(wvpResult); @@ -277,6 +253,186 @@ public class PlayServiceImpl implements IPlayService { return playResult; } + @Override + public void talk(MediaServerItem mediaServerItem, Device device, String channelId, + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback) { + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); + logger.info("[对讲开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); + // 超时处理 + String timeOutTaskKey = UUID.randomUUID().toString(); + SSRCInfo finalSsrcInfo = ssrcInfo; + System.out.println("设置超时任务: " + timeOutTaskKey); + dynamicTask.startDelay(timeOutTaskKey, () -> { + + logger.info("[对讲超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc()); + timeoutCallback.run(); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + try { + cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); + } catch (SsrcTransactionNotFoundException e) { + timeoutCallback.run(); + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + } + }, userSetting.getPlayTimeout()); + final String ssrc = ssrcInfo.getSsrc(); + final String stream = ssrcInfo.getStream(); + //端口获取失败的ssrcInfo 没有必要发送点播指令 + if (ssrcInfo.getPort() <= 0) { + logger.info("[对讲] 端口分配异常,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); + return; + } + try { + String callId = SipUtils.getNewCallId(); + cmder.talkStreamCmd(mediaServerItem, ssrcInfo, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { + logger.info("[对讲] 流已生成, 开始推流: " + response.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // TODO 暂不做处理 + }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { + logger.info("[对讲] 开始推流: " + json.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // 获取远程IP端口 作为回复语音流的地址 + String ip = json.getString("ip"); + Integer port = json.getInteger("port"); + logger.info("[远端设备开始推流]{}/{}, 来自ip:{}, 端口:{}", device.getDeviceId(), channelId, ip, port); + // 查看平台推流是否就绪 + Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemInuse, "talk", stream); + if (!ready) { + try { + cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); + } catch (SsrcTransactionNotFoundException e) { + timeoutCallback.run(); + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + } + }else { + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(), + device.getDeviceId(), channelId, + false); + + sendRtpItem.setTcpActive(false); + if (sendRtpItem == null || sendRtpItem.getLocalPort() == 0) { + logger.warn("服务器端口资源不足"); + try { + cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); + } catch (SsrcTransactionNotFoundException e) { + timeoutCallback.run(); + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + } + return; + } + sendRtpItem.setCallId(callId); + sendRtpItem.setPlayType(InviteStreamType.TALK); + sendRtpItem.setStatus(1); + sendRtpItem.setIp(ip); + sendRtpItem.setPort(port); + sendRtpItem.setTcpActive(false); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + sendRtpItem.setApp("talk"); + sendRtpItem.setSsrc(ssrc); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItemInuse, param); + System.out.println(11111); + System.out.println(jsonObject); + } + + }, (event) -> { +// ResponseEvent responseEvent = (ResponseEvent) event.event; +// String contentString = new String(responseEvent.getResponse().getRawContent()); +// // 获取ssrc +// int ssrcIndex = contentString.indexOf("y="); +// // 检查是否有y字段 +// if (ssrcIndex >= 0) { +// //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 +// String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); +// // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 +// if (ssrc.equals(ssrcInResponse)) { +// return; +// } +// logger.info("[对讲消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); +// if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { +// logger.info("[对讲消息] SSRC修正 {}->{}", ssrc, ssrcInResponse); +// +// if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { +// // ssrc 不可用 +// // 释放ssrc +// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); +// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); +// event.msg = "下级自定义了ssrc,但是此ssrc不可用"; +// event.statusCode = 400; +// errorEvent.response(event); +// return; +// } +// +// // 单端口模式streamId也有变化,需要重新设置监听 +// if (!mediaServerItem.isRtpEnable()) { +// // 添加订阅 +// HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); +// subscribe.removeSubscribe(hookSubscribe); +// hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); +// subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { +// logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); +// dynamicTask.stop(timeOutTaskKey); +// // hook响应 +// onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); +// hookEvent.response(mediaServerItemInUse, response); +// }); +// } +// // 关闭rtp server +// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); +// // 重新开启ssrc server +// mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort()); +// +// } +// } + }, (event) -> { + dynamicTask.stop(timeOutTaskKey); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + errorEvent.response(event); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { + + logger.error("[命令发送失败] 对讲消息: {}", e.getMessage()); + dynamicTask.stop(timeOutTaskKey); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); + eventResult.msg = "命令发送失败"; + errorEvent.response(eventResult); + } + } @Override @@ -291,12 +447,12 @@ public class PlayServiceImpl implements IPlayService { if (ssrcInfo == null) { ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); } - logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck() ); + logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); SSRCInfo finalSsrcInfo = ssrcInfo; System.out.println("设置超时任务: " + timeOutTaskKey); - dynamicTask.startDelay( timeOutTaskKey,()->{ + dynamicTask.startDelay(timeOutTaskKey, () -> { logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc()); timeoutCallback.run(1, "收流超时"); @@ -315,7 +471,7 @@ public class PlayServiceImpl implements IPlayService { final String ssrc = ssrcInfo.getSsrc(); final String stream = ssrcInfo.getStream(); //端口获取失败的ssrcInfo 没有必要发送点播指令 - if(ssrcInfo.getPort() <= 0){ + if (ssrcInfo.getPort() <= 0) { logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); return; } @@ -330,7 +486,7 @@ public class PlayServiceImpl implements IPlayService { logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); }, (event) -> { - ResponseEvent responseEvent = (ResponseEvent)event.event; + ResponseEvent responseEvent = (ResponseEvent) event.event; String contentString = new String(responseEvent.getResponse().getRawContent()); // 获取ssrc int ssrcIndex = contentString.indexOf("y="); @@ -342,7 +498,7 @@ public class PlayServiceImpl implements IPlayService { if (ssrc.equals(ssrcInResponse)) { return; } - logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse ); + logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse); @@ -363,13 +519,13 @@ public class PlayServiceImpl implements IPlayService { HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{ - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); - dynamicTask.stop(timeOutTaskKey); - // hook响应 - onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); - hookEvent.response(mediaServerItemInUse, response); - }); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { + logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // hook响应 + onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); + hookEvent.response(mediaServerItemInUse, response); + }); } // 关闭rtp server mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); @@ -441,7 +597,7 @@ public class PlayServiceImpl implements IPlayService { MediaServerItem mediaServerItem; if (mediaServerId == null) { mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(); - }else { + } else { mediaServerItem = mediaServerService.getOne(mediaServerId); } if (mediaServerItem == null) { @@ -452,8 +608,8 @@ public class PlayServiceImpl implements IPlayService { @Override public DeferredResult> playBack(String deviceId, String channelId, String startTime, - String endTime,InviteStreamCallback inviteStreamCallback, - PlayBackCallback callback) { + String endTime, InviteStreamCallback inviteStreamCallback, + PlayBackCallback callback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { return null; @@ -466,9 +622,9 @@ public class PlayServiceImpl implements IPlayService { @Override public DeferredResult> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, - String deviceId, String channelId, String startTime, - String endTime, InviteStreamCallback infoCallBack, - PlayBackCallback playBackCallback) { + String deviceId, String channelId, String startTime, + String endTime, InviteStreamCallback infoCallBack, + PlayBackCallback playBackCallback) { if (mediaServerItem == null || ssrcInfo == null) { return null; } @@ -485,7 +641,7 @@ public class PlayServiceImpl implements IPlayService { requestMessage.setKey(key); PlayBackResult playBackResult = new PlayBackResult<>(); String playBackTimeOutTaskKey = UUID.randomUUID().toString(); - dynamicTask.startDelay(playBackTimeOutTaskKey, ()->{ + dynamicTask.startDelay(playBackTimeOutTaskKey, () -> { logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId)); playBackResult.setCode(ErrorCode.ERROR100.getCode()); playBackResult.setMsg("回放超时"); @@ -545,7 +701,7 @@ public class PlayServiceImpl implements IPlayService { cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack, hookEvent, eventResult -> { if (eventResult.type == SipSubscribe.EventResultType.response) { - ResponseEvent responseEvent = (ResponseEvent)eventResult.event; + ResponseEvent responseEvent = (ResponseEvent) eventResult.event; String contentString = new String(responseEvent.getResponse().getRawContent()); // 获取ssrc int ssrcIndex = contentString.indexOf("y="); @@ -557,7 +713,7 @@ public class PlayServiceImpl implements IPlayService { if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { return; } - logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse ); + logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); @@ -578,7 +734,7 @@ public class PlayServiceImpl implements IPlayService { HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{ + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); dynamicTask.stop(playBackTimeOutTaskKey); // hook响应 @@ -614,7 +770,7 @@ public class PlayServiceImpl implements IPlayService { MediaServerItem newMediaServerItem = getNewMediaServerItem(device); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true); - return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack); + return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, hookCallBack); } @Override @@ -640,7 +796,7 @@ public class PlayServiceImpl implements IPlayService { downloadResult.setData(requestMessage); String downLoadTimeOutTaskKey = UUID.randomUUID().toString(); - dynamicTask.startDelay(downLoadTimeOutTaskKey, ()->{ + dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> { logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId)); wvpResult.setCode(ErrorCode.ERROR100.getCode()); wvpResult.setMsg("录像下载请求超时"); @@ -723,15 +879,15 @@ public class PlayServiceImpl implements IPlayService { if (duration == 0) { streamInfo.setProgress(0); - }else { + } else { String startTime = streamInfo.getStartTime(); String endTime = streamInfo.getEndTime(); long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime); long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime); - BigDecimal currentCount = new BigDecimal(duration/1000); - BigDecimal totalCount = new BigDecimal(end-start); - BigDecimal divide = currentCount.divide(totalCount,2, RoundingMode.HALF_UP); + BigDecimal currentCount = new BigDecimal(duration / 1000); + BigDecimal totalCount = new BigDecimal(end - start); + BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP); double process = divide.doubleValue(); streamInfo.setProgress(process); } @@ -762,7 +918,7 @@ public class PlayServiceImpl implements IPlayService { public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) { String streamId = resonse.getString("stream"); JSONArray tracks = resonse.getJSONArray("tracks"); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks, null); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null); streamInfo.setDeviceID(deviceId); streamInfo.setChannelId(channelId); return streamInfo; @@ -788,7 +944,7 @@ public class PlayServiceImpl implements IPlayService { List allSsrc = streamSession.getAllSsrc(); if (allSsrc.size() > 0) { for (SsrcTransaction ssrcTransaction : allSsrc) { - if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) { + if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) { Device device = deviceService.queryDevice(ssrcTransaction.getDeviceId()); if (device == null) { continue; @@ -806,10 +962,36 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event) { + public AudioBroadcastResult audioBroadcast(Device device, String channelId) { + if (device == null || channelId == null) { + return null; + } + logger.info("[语音喊话] device: {}, channel: {}", device.getDeviceId(), channelId); + DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); + if (deviceChannel == null) { + logger.warn("开启语音广播的时候未找到通道: {}", channelId); + return null; + } + MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(); +// String app = "broadcast"; + // TODO 从sip user agent中判断是什么品牌设备,大华默认使用talk模式,其他使用broadcast模式 + String app = "talk"; + String stream = device.getDeviceId() + "_" + channelId; + StreamInfo broadcast = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "broadcast", stream, null, null, null, false); + AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); + audioBroadcastResult.setApp(app); + audioBroadcastResult.setStream(stream); + audioBroadcastResult.setStreamInfo(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false)); + audioBroadcastResult.setCodec("G.711"); + return audioBroadcastResult; + } + + @Override + public void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { if (device == null || channelId == null) { return; } + logger.info("[语音喊话] device: {}, channel: {}", device.getDeviceId(), channelId); DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); if (deviceChannel == null) { logger.warn("开启语音广播的时候未找到通道: {}", channelId); @@ -818,7 +1000,7 @@ public class PlayServiceImpl implements IPlayService { } // 查询通道使用状态 if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); @@ -827,8 +1009,8 @@ public class PlayServiceImpl implements IPlayService { logger.warn("语音广播已经开启: {}", channelId); event.call("语音广播已经开启"); return; - }else { - audioBroadcastManager.del(deviceChannel.getDeviceId(),channelId); + } else { + audioBroadcastManager.del(deviceChannel.getDeviceId(), channelId); redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId, sendRtpItem.getCallId(), sendRtpItem.getStreamId()); } } @@ -847,39 +1029,33 @@ public class PlayServiceImpl implements IPlayService { }); } + + @Override - public void stopAudioBroadcast(String deviceId, String channelId){ + public void stopAudioBroadcast(String deviceId, String channelId) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); if (audioBroadcastCatch != null) { - try { - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); - if (sendRtpItem != null) { - redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map param = new HashMap<>(); - param.put("vhost", "__defaultVhost__"); - param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); - zlmresTfulUtils.stopSendRtp(mediaInfo, param); - // 立刻结束设备的推流,等待自行结束太慢 - zlmresTfulUtils.closeStreams(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStreamId()); - } - if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) { - cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getChannelId(), audioBroadcastCatch.getRequest(), null); - } - audioBroadcastManager.del(deviceId, channelId); - - } catch (SipException e) { - throw new RuntimeException(e); - } catch (ParseException e) { - throw new RuntimeException(e); - } catch (InvalidArgumentException e) { - throw new RuntimeException(e); + Device device = deviceService.queryDevice(deviceId); + if (device == null) { + return; } +// if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) { +// cmder.streamByeCmd(device, audioBroadcastCatch.getChannelId(), null, audioBroadcastCatch.getSipTransactionInfo().getCallId()); +// } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); + if (sendRtpItem != null) { + redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStreamId()); + zlmresTfulUtils.stopSendRtp(mediaInfo, param); + } + + audioBroadcastManager.del(deviceId, channelId); } - - } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index ff9a39ab..bcd8bf17 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -216,65 +216,20 @@ public class PlayController { @Parameter(name = "timeout", description = "推流超时时间(秒)", required = true) @GetMapping("/broadcast/{deviceId}/{channelId}") @PostMapping("/broadcast/{deviceId}/{channelId}") - public DeferredResult broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout) { + public AudioBroadcastResult broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout) { if (logger.isDebugEnabled()) { logger.debug("语音广播API调用"); } Device device = storager.queryVideoDevice(deviceId); - DeferredResult result = new DeferredResult<>(3 * 1000L); if (device == null) { - result.setResult("未找到设备: " + deviceId); - return result; + throw new ControllerException(ErrorCode.ERROR400.getCode(), "未找到设备: " + deviceId); } if (channelId == null) { - result.setResult("未找到通道: " + channelId); - return result; + throw new ControllerException(ErrorCode.ERROR400.getCode(), "未找到通道: " + channelId); } - String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId; - if (resultHolder.exist(key, null)) { - result.setResult("设备使用中"); - return result; - } - if (timeout == null){ - timeout = 30; - } - String uuid = UUID.randomUUID().toString(); - result.onTimeout(() -> { - logger.warn("语音广播操作超时, 设备未返回应答指令"); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - JSONObject json = new JSONObject(); - json.put("DeviceID", deviceId); - json.put("CmdType", "Broadcast"); - json.put("Result", "Failed"); - json.put("Error", "Timeout. Device did not response to broadcast command."); - msg.setData(json); - resultHolder.invokeResult(msg); - }); + return playService.audioBroadcast(device, channelId); - result.onTimeout(()->{ - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(-1); - wvpResult.setMsg("请求超时"); - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setKey(key); - requestMessage.setData(wvpResult); - resultHolder.invokeAllResult(requestMessage); - }); - playService.audioBroadcast(device, channelId, timeout, (msg)->{ - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(-1); - wvpResult.setMsg(msg); - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setKey(key); - requestMessage.setData(wvpResult); - resultHolder.invokeAllResult(requestMessage); - }); - resultHolder.put(key, uuid, result); - - return result; } @@ -283,10 +238,16 @@ public class PlayController { @Parameter(name = "channelId", description = "通道Id", required = true) @GetMapping("/broadcast/stop/{deviceId}/{channelId}") @PostMapping("/broadcast/stop/{deviceId}/{channelId}") - public void stopBroadcastA(@PathVariable String deviceId, @PathVariable String channelId) { + public void stopBroadcast(@PathVariable String deviceId, @PathVariable String channelId) { if (logger.isDebugEnabled()) { logger.debug("停止语音广播API调用"); } +// try { +// playService.stopAudioBroadcast(deviceId, channelId); +// } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { +// logger.error("[命令发送失败] 停止语音: {}", e.getMessage()); +// throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); +// } playService.stopAudioBroadcast(deviceId, channelId); } diff --git a/web_src/config/index.js b/web_src/config/index.js index c5a74528..c6287d05 100644 --- a/web_src/config/index.js +++ b/web_src/config/index.js @@ -12,14 +12,14 @@ module.exports = { assetsPublicPath: '/', proxyTable: { '/debug': { - target: 'http://localhost:38080', + target: 'https://default.wvp-pro.cn:18080', changeOrigin: true, pathRewrite: { '^/debug': '/' } }, '/static/snap': { - target: 'http://localhost:38080', + target: 'https://default.wvp-pro.cn:18080', changeOrigin: true, // pathRewrite: { // '^/static/snap': '/static/snap' diff --git a/web_src/src/components/dialog/devicePlayer.vue b/web_src/src/components/dialog/devicePlayer.vue index ee5fd7b1..4c8f2641 100644 --- a/web_src/src/components/dialog/devicePlayer.vue +++ b/web_src/src/components/dialog/devicePlayer.vue @@ -1,7 +1,7 @@