From c2aaae9325db012c9960b69784330ced5ec15ab9 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 9 May 2022 18:15:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=9E=E7=8E=B0=E8=AF=AD?= =?UTF-8?q?=E9=9F=B3=E5=96=8A=E8=AF=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/bean/AudioBroadcastCatch.java | 59 ++++++ .../bean/AudioBroadcastCatchStatus.java | 15 ++ .../iot/vmp/gb28181/bean/Device.java | 17 -- .../session/AudioBroadcastManager.java | 59 ++++++ .../vmp/gb28181/session/CatalogDataCatch.java | 3 - .../gb28181/transmit/cmd/ISIPCommander.java | 15 +- .../transmit/cmd/impl/SIPCommander.java | 81 +++---- .../request/impl/AckRequestProcessor.java | 3 + .../request/impl/InviteRequestProcessor.java | 188 ++++++++++++++++- .../impl/message/MessageHandlerAbstract.java | 21 ++ .../cmd/BroadcastResponseMessageHandler.java | 29 +-- .../vmp/media/zlm/ZLMRTPServerFactory.java | 2 +- .../media/zlm/dto/MediaServerItemLite.java | 197 ++++++++++++++++++ .../iot/vmp/service/IPlayService.java | 3 + .../iot/vmp/service/impl/PlayServiceImpl.java | 44 ++++ .../iot/vmp/storager/dao/DeviceMapper.java | 6 - .../vmanager/bean/AudioBroadcastResult.java | 62 ++++++ .../vmanager/gb28181/play/PlayController.java | 118 +++++------ .../play/bean/AudioBroadcastEvent.java | 9 + web_src/src/components/dialog/deviceEdit.vue | 7 +- 20 files changed, 760 insertions(+), 178 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatchStatus.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItemLite.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/bean/AudioBroadcastResult.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java new file mode 100644 index 00000000..64227d40 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -0,0 +1,59 @@ +package com.genersoft.iot.vmp.gb28181.bean; + + +/** + * 缓存语音广播的状态 + * @author lin + */ +public class AudioBroadcastCatch { + + + public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status) { + this.deviceId = deviceId; + this.channelId = channelId; + this.status = status; + } + + public AudioBroadcastCatch() { + } + + /** + * 设备编号 + */ + private String deviceId; + + /** + * 通道编号 + */ + private String channelId; + + /** + * 语音广播状态 + */ + private AudioBroadcastCatchStatus status; + + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public AudioBroadcastCatchStatus getStatus() { + return status; + } + + public void setStatus(AudioBroadcastCatchStatus status) { + this.status = status; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatchStatus.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatchStatus.java new file mode 100644 index 00000000..7d4f7c83 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatchStatus.java @@ -0,0 +1,15 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +/** + * 语音广播状态 + * @author lin + */ +public enum AudioBroadcastCatchStatus { + + // 发送语音广播消息等待对方回复语音广播 + Ready, + // 收到回复等待invite消息 + WaiteInvite, + // 收到invite消息 + Ok, +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index 778608e5..1dc62ab3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -134,16 +134,6 @@ public class Device { */ private boolean ssrcCheck; - /** - * 设备用于接收语音消息的通道 - */ - private String audioChannelForReceive; - - /** - * 设备用于发送语音消息的通道 - */ - private String audioChannelForSend; - public String getDeviceId() { return deviceId; @@ -345,11 +335,4 @@ public class Device { this.ssrcCheck = ssrcCheck; } - public String getAudioChannelForReceive() { - return audioChannelForReceive; - } - - public void setAudioChannelForReceive(String audioChannelForReceive) { - this.audioChannelForReceive = audioChannelForReceive; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java new file mode 100644 index 00000000..dec96c00 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -0,0 +1,59 @@ +package com.genersoft.iot.vmp.gb28181.session; + +import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 语音广播消息管理类 + * @author lin + */ +@Component +public class AudioBroadcastManager { + + public static Map data = new ConcurrentHashMap<>(); + + public void add(AudioBroadcastCatch audioBroadcastCatch) { + this.update(audioBroadcastCatch); + } + + public void update(AudioBroadcastCatch audioBroadcastCatch) { + data.put(audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId(), audioBroadcastCatch); + } + + public void del(String deviceId, String channelId) { + data.remove(deviceId + channelId); + } + + public void delByDeviceId(String deviceId) { + for (String key : data.keySet()) { + if (key.startsWith(deviceId)) { + data.remove(key); + } + } + } + + public List getAll(){ + Collection values = data.values(); + return new ArrayList<>(values); + } + + + public boolean exit(String deviceId, String channelId) { + for (String key : data.keySet()) { + if (key.equals(deviceId + channelId)) { + return true; + } + } + return false; + } + + public AudioBroadcastCatch get(String deviceId, String channelId) { + return data.get(deviceId + channelId); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java index 62393d52..3d539143 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java @@ -20,9 +20,6 @@ public class CatalogDataCatch { public static Map data = new ConcurrentHashMap<>(); - @Autowired - private DeferredResultHolder deferredResultHolder; - @Autowired private IVideoManagerStorage storager; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index f4bcbb64..bd51cfa8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -6,8 +6,12 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.stack.SIPDialog; import javax.sip.Dialog; +import javax.sip.SipException; +import java.text.ParseException; /** * @description:设备能力接口,用于定义设备的控制、查询能力 @@ -123,6 +127,7 @@ public interface ISIPCommander { */ void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent); void streamByeCmd(String deviceId, String channelId, String stream, String callId); + void streamByeCmd(SIPDialog dialog, SIPRequest request, SipSubscribe.Event okEvent) throws SipException, ParseException; /** * 回放暂停 @@ -144,21 +149,13 @@ public interface ISIPCommander { */ void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed); - /** - * 语音广播 - * - * @param device 视频设备 - * @param channelId 预览通道 - */ - boolean audioBroadcastCmd(Device device,String channelId); /** * 语音广播 * * @param device 视频设备 */ - void audioBroadcastCmd(Device device, SipSubscribe.Event okEvent); - boolean audioBroadcastCmd(Device device); + boolean audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent); /** * 音视频录像控制 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index ea8f2025..8ff1e45b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -733,42 +733,34 @@ public class SIPCommander implements ISIPCommander { } } - Request byeRequest = dialog.createRequest(Request.BYE); - SipURI byeURI = (SipURI) byeRequest.getRequestURI(); - SIPRequest request = (SIPRequest)transaction.getRequest(); - byeURI.setHost(request.getRemoteAddress().getHostAddress()); - byeURI.setPort(request.getRemotePort()); - ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME); - String protocol = viaHeader.getTransport().toUpperCase(); - ClientTransaction clientTransaction = null; - if("TCP".equals(protocol)) { - clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); - } else if("UDP".equals(protocol)) { - clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); - } - - CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME); - if (okEvent != null) { - sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); - } - - dialog.sendRequest(clientTransaction); + streamByeCmd(dialog, (SIPRequest)transaction.getRequest(), okEvent); } catch (SipException | ParseException e) { e.printStackTrace(); } } - /** - * 语音广播 - * - * @param device 视频设备 - * @param channelId 预览通道 - */ @Override - public boolean audioBroadcastCmd(Device device, String channelId) { - // 改为新的实现 - return false; + public void streamByeCmd(SIPDialog dialog, SIPRequest request, SipSubscribe.Event okEvent) throws SipException, ParseException { + Request byeRequest = dialog.createRequest(Request.BYE); + SipURI byeURI = (SipURI) byeRequest.getRequestURI(); + byeURI.setHost(request.getRemoteAddress().getHostAddress()); + byeURI.setPort(request.getRemotePort()); + ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME); + String protocol = viaHeader.getTransport().toUpperCase(); + ClientTransaction clientTransaction = null; + if("TCP".equals(protocol)) { + clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); + } else if("UDP".equals(protocol)) { + clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); + } + + CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME); + if (okEvent != null) { + sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); + } + + dialog.sendRequest(clientTransaction); } /** @@ -777,7 +769,7 @@ public class SIPCommander implements ISIPCommander { * @param device 视频设备 */ @Override - public boolean audioBroadcastCmd(Device device) { + public boolean audioBroadcastCmd(Device device,String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { try { StringBuffer broadcastXml = new StringBuffer(200); String charset = device.getCharset(); @@ -786,7 +778,7 @@ public class SIPCommander implements ISIPCommander { broadcastXml.append("Broadcast\r\n"); broadcastXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); broadcastXml.append("" + sipConfig.getId() + "\r\n"); - broadcastXml.append("" + device.getDeviceId() + "\r\n"); + broadcastXml.append("" + channelId + "\r\n"); broadcastXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); @@ -795,39 +787,14 @@ public class SIPCommander implements ISIPCommander { : udpSipProvider.getNewCallId(); Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), "z9hG4bK-ViaBcst-" + tm, "FromBcst" + tm, null, callIdHeader); - transmitRequest(device, request); + transmitRequest(device, request, errorEvent, okEvent); return true; } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); } return false; } - @Override - public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) { - try { - StringBuffer broadcastXml = new StringBuffer(200); - String charset = device.getCharset(); - broadcastXml.append("\r\n"); - broadcastXml.append("\r\n"); - broadcastXml.append("Broadcast\r\n"); - broadcastXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); - broadcastXml.append("" + sipConfig.getId() + "\r\n"); - broadcastXml.append("" + device.getDeviceId() + "\r\n"); - broadcastXml.append("\r\n"); - - String tm = Long.toString(System.currentTimeMillis()); - CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() - : udpSipProvider.getNewCallId(); - - Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), "z9hG4bK-ViaBcst-" + tm, "FromBcst" + tm, null, callIdHeader); - transmitRequest(device, request, errorEvent); - } catch (SipException | ParseException | InvalidArgumentException e) { - e.printStackTrace(); - } - } - - /** * 音视频录像控制 * diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 0f65bf52..5fd6c99b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -94,6 +94,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", 8); + param.put("use_ps", 0); + param.put("only_audio", 1); zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index bb46a71c..ae49e2cd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -2,21 +2,27 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -24,8 +30,12 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SerializeUtils; +import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -41,6 +51,7 @@ import javax.sip.message.Response; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.List; import java.util.Vector; /** @@ -73,7 +84,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private IPlayService playService; @Autowired - private ISIPCommander commander; + private AudioBroadcastManager audioBroadcastManager; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @@ -93,6 +104,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private ZLMMediaListManager mediaListManager; + @Autowired + private DeferredResultHolder resultHolder; + + @Autowired + private ZLMHttpHookSubscribe subscribe; + + @Autowired + private SipConfig config; + @Override public void afterPropertiesSet() throws Exception { @@ -126,7 +146,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform == null) { - inviteFromDeviceHandle(evt, requesterId); + inviteFromDeviceHandle(evt, requesterId, channelId); }else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); @@ -542,10 +562,25 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException { + // 兼容奇葩的海康这里使用的不是通道编号而是本平台编号 +// if (channelId.equals(config.getId())) { +// List all = audioBroadcastManager.getAll(); +// for (AudioBroadcastCatch audioBroadcastCatch : all) { +// if (audioBroadcastCatch.getDeviceId().equals(requesterId)) { +// channelId = audioBroadcastCatch.getChannelId(); +// } +// } +// } +// // 兼容失败 +// if (channelId.equals(config.getId())) { +// responseAck(evt, Response.BAD_REQUEST); +// return; +// } // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); + Request request = evt.getRequest(); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); @@ -558,7 +593,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements int ssrcIndex = contentString.indexOf("y="); if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim(); } ssrcIndex = substring.indexOf("f="); if (ssrcIndex > 0) { @@ -568,6 +603,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 int port = -1; //boolean recvonly = false; @@ -602,10 +638,150 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } - String username = sdp.getOrigin().getUsername(); + String sessionName = sdp.getSessionName().getValue(); String addressStr = sdp.getOrigin().getAddress(); - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); + MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); + if (mediaServerItem == null) { + logger.warn("未找到可用的zlm"); + responseAck(evt, Response.BUSY_HERE); + return; + } + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + device.getDeviceId(), channelId, + mediaTransmissionTCP); + sendRtpItem.setTcp(mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAck(evt, Response.BUSY_HERE); + return; + } + + String app = "broadcast"; + String stream = device.getDeviceId() + "_" + channelId; + + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); + sendRtpItem.setPlayType(InviteStreamType.PLAY); + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setPlatformId(requesterId); + sendRtpItem.setStatus(1); + sendRtpItem.setApp(app); + sendRtpItem.setStreamId(stream); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + + // hook监听等待设备推流上来 + // 添加订阅 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", app); + subscribeKey.put("stream", stream); + subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); + subscribeKey.put("mediaServerId", mediaServerItem.getId()); + String finalSsrc = ssrc; + String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + channelId; + if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { + logger.info("发现已经在推流"); + dynamicTask.stop(waiteStreamTimeoutTaskKey); + sendRtpItem.setStatus(2); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("t=0 0\r\n"); + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:8 PCMA/8000\r\n"); + content.append("y="+ finalSsrc + "\r\n"); + content.append("f=v/////a/1/8/1\r\n"); + + ParentPlatform parentPlatform = new ParentPlatform(); + parentPlatform.setServerIP(device.getIp()); + parentPlatform.setServerPort(device.getPort()); + parentPlatform.setServerGBId(device.getDeviceId()); + try { + responseSdpAck(evt, content.toString(), parentPlatform); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + }else { + // 设置等待推流的超时; 默认20s + String finalChannelId = channelId; + dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ + logger.info("等待推流超时: {}/{}", app, stream); + if (audioBroadcastManager.exit(device.getDeviceId(), finalChannelId)) { + audioBroadcastManager.del(device.getDeviceId(), finalChannelId); + }else { + // 兼容海康使用了错误的通道ID的情况 + audioBroadcastManager.delByDeviceId(device.getDeviceId()); + } + + // 发送bye + try { + cmder.streamByeCmd((SIPDialog)evt.getServerTransaction().getDialog(), (SIPRequest) evt.getRequest(), null); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + }, 20*1000); + + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + sendRtpItem.setStatus(2); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o="+ finalChannelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("t=0 0\r\n"); + content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:8 PCMA/8000\r\n"); + content.append("y="+ finalSsrc + "\r\n"); + content.append("f=v/////a/1/8/1\r\n"); + + ParentPlatform parentPlatform = new ParentPlatform(); + parentPlatform.setServerIP(device.getIp()); + parentPlatform.setServerPort(device.getPort()); + parentPlatform.setServerGBId(device.getDeviceId()); + try { + responseSdpAck(evt, content.toString(), parentPlatform); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + }); + } + String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; + dynamicTask.stop(timeOutTaskKey); + String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(0); + wvpResult.setMsg("success"); + AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); + audioBroadcastResult.setApp(app); + audioBroadcastResult.setStream(stream); + audioBroadcastResult.setMediaServerItem(new MediaServerItemLite(mediaServerItem)); + audioBroadcastResult.setCodec("G.711"); + wvpResult.setData(audioBroadcastResult); + RequestMessage requestMessage = new RequestMessage(); + requestMessage.setKey(key); + requestMessage.setData(wvpResult); + resultHolder.invokeAllResult(requestMessage); } else { logger.warn("来自无效设备/平台的请求"); responseAck(evt, Response.BAD_REQUEST); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java index afaa7cb3..cf7d1122 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java @@ -6,7 +6,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import org.dom4j.Element; import org.springframework.beans.factory.annotation.Autowired; +import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; +import javax.sip.SipException; +import javax.sip.message.Response; +import java.text.ParseException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -23,6 +27,10 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i @Override public void handForDevice(RequestEvent evt, Device device, Element element) { String cmd = getText(element, "CmdType"); + if (cmd == null) { + handNullCmd(evt); + return; + } IMessageHandler messageHandler = messageHandlerMap.get(cmd); if (messageHandler != null) { messageHandler.handForDevice(evt, device, element); @@ -37,4 +45,17 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i messageHandler.handForPlatform(evt, parentPlatform, element); } } + + public void handNullCmd(RequestEvent evt){ + try { + responseAck(evt, Response.OK); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + return; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index ac946551..e684be38 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -1,8 +1,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; +import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -36,6 +39,9 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i @Autowired private DeferredResultHolder deferredResultHolder; + @Autowired + private AudioBroadcastManager audioBroadcastManager; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -45,21 +51,16 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i public void handForDevice(RequestEvent evt, Device device, Element rootElement) { try { String channelId = getText(rootElement, "DeviceID"); - String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId() + channelId; - // 回复200 OK - responseAck(evt, Response.OK); - // 此处是对本平台发出Broadcast指令的应答 - JSONObject json = new JSONObject(); - XmlUtil.node2Json(rootElement, json); - if (logger.isDebugEnabled()) { - logger.debug(json.toJSONString()); + if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { + // 回复410 + responseAck(evt, Response.GONE); + return; } - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); - - + logger.info("收到语音广播的回复:{}/{}", device.getDeviceId(), channelId ); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); + audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); + audioBroadcastManager.update(audioBroadcastCatch); + responseAck(evt, Response.OK); } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index a7e60167..5ab8044c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -271,7 +271,7 @@ public class ZLMRTPServerFactory { * 查询待转推的流是否就绪 */ public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) { - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId); + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", streamId); return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItemLite.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItemLite.java new file mode 100644 index 00000000..de68e30c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItemLite.java @@ -0,0 +1,197 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + + +import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; +import org.springframework.util.StringUtils; + +import java.util.HashMap; + +/** + * 精简的MediaServerItem信息,方便给前端返回数据 + */ +public class MediaServerItemLite { + + private String id; + + private String ip; + + private String hookIp; + + private String sdpIp; + + private String streamIp; + + private int httpPort; + + private int httpSSlPort; + + private int rtmpPort; + + private int rtmpSSlPort; + + private int rtpProxyPort; + + private int rtspPort; + + private int rtspSSLPort; + + private String secret; + + private int streamNoneReaderDelayMS; + + private int hookAliveInterval; + + private int recordAssistPort; + + + + public MediaServerItemLite(MediaServerItem mediaServerItem) { + this.id = mediaServerItem.getId(); + this.ip = mediaServerItem.getIp(); + this.hookIp = mediaServerItem.getHookIp(); + this.sdpIp = mediaServerItem.getSdpIp(); + this.streamIp = mediaServerItem.getStreamIp(); + this.httpPort = mediaServerItem.getHttpPort(); + this.httpSSlPort = mediaServerItem.getHttpSSlPort(); + this.rtmpPort = mediaServerItem.getRtmpPort(); + this.rtmpSSlPort = mediaServerItem.getRtmpSSlPort(); + this.rtpProxyPort = mediaServerItem.getRtpProxyPort(); + this.rtspPort = mediaServerItem.getRtspPort(); + this.rtspSSLPort = mediaServerItem.getRtspSSLPort(); + this.secret = mediaServerItem.getSecret(); + this.streamNoneReaderDelayMS = mediaServerItem.getStreamNoneReaderDelayMS(); + this.hookAliveInterval = mediaServerItem.getHookAliveInterval(); + this.streamNoneReaderDelayMS = mediaServerItem.getStreamNoneReaderDelayMS(); + this.recordAssistPort = mediaServerItem.getRecordAssistPort(); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public String getHookIp() { + return hookIp; + } + + public void setHookIp(String hookIp) { + this.hookIp = hookIp; + } + + public String getSdpIp() { + return sdpIp; + } + + public void setSdpIp(String sdpIp) { + this.sdpIp = sdpIp; + } + + public String getStreamIp() { + return streamIp; + } + + public void setStreamIp(String streamIp) { + this.streamIp = streamIp; + } + + public int getHttpPort() { + return httpPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } + + public int getHttpSSlPort() { + return httpSSlPort; + } + + public void setHttpSSlPort(int httpSSlPort) { + this.httpSSlPort = httpSSlPort; + } + + public int getRtmpPort() { + return rtmpPort; + } + + public void setRtmpPort(int rtmpPort) { + this.rtmpPort = rtmpPort; + } + + public int getRtmpSSlPort() { + return rtmpSSlPort; + } + + public void setRtmpSSlPort(int rtmpSSlPort) { + this.rtmpSSlPort = rtmpSSlPort; + } + + public int getRtpProxyPort() { + return rtpProxyPort; + } + + public void setRtpProxyPort(int rtpProxyPort) { + this.rtpProxyPort = rtpProxyPort; + } + + public int getRtspPort() { + return rtspPort; + } + + public void setRtspPort(int rtspPort) { + this.rtspPort = rtspPort; + } + + public int getRtspSSLPort() { + return rtspSSLPort; + } + + public void setRtspSSLPort(int rtspSSLPort) { + this.rtspSSLPort = rtspSSLPort; + } + + + public String getSecret() { + return secret; + } + + public void setSecret(String secret) { + this.secret = secret; + } + + public int getStreamNoneReaderDelayMS() { + return streamNoneReaderDelayMS; + } + + public void setStreamNoneReaderDelayMS(int streamNoneReaderDelayMS) { + this.streamNoneReaderDelayMS = streamNoneReaderDelayMS; + } + + public int getHookAliveInterval() { + return hookAliveInterval; + } + + public void setHookAliveInterval(int hookAliveInterval) { + this.hookAliveInterval = hookAliveInterval; + } + + public int getRecordAssistPort() { + return recordAssistPort; + } + + public void setRecordAssistPort(int recordAssistPort) { + this.recordAssistPort = recordAssistPort; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 9e764931..63005516 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.springframework.http.ResponseEntity; import org.springframework.web.context.request.async.DeferredResult; @@ -40,4 +41,6 @@ public interface IPlayService { DeferredResult> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack); StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); + + void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index f2b6c282..a647c84c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -26,7 +27,9 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; @@ -57,6 +60,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private SIPCommander cmder; + @Autowired + private AudioBroadcastManager audioBroadcastManager; + @Autowired private SIPCommanderFroPlatform sipCommanderFroPlatform; @@ -621,4 +627,42 @@ public class PlayServiceImpl implements IPlayService { } } } + + @Override + public void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event) { + if (device == null || channelId == null) { + return; + } + DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); + if (deviceChannel == null) { + logger.warn("开启语音广播的时候未找到通道: {}", channelId); + event.call("开启语音广播的时候未找到通道"); + return; + } + // 查询通道使用状态 + if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { + logger.warn("语音广播已经开启: {}", channelId); + event.call("语音广播已经开启"); + return; + } + String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; + dynamicTask.startDelay(timeOutTaskKey, ()->{ + logger.error("语音广播发送超时: {}:{}", device.getDeviceId(), channelId); + event.call("语音广播发送超时"); + audioBroadcastManager.del(device.getDeviceId(), channelId); + }, timeout * 1000); + + // 发送通知 + cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { + // 发送成功 + AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); + audioBroadcastManager.add(audioBroadcastCatch); + }, eventResultForError -> { + dynamicTask.stop(timeOutTaskKey); + // 发送失败 + logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); + event.call("语音广播发送失败"); + audioBroadcastManager.del(device.getDeviceId(), channelId); + }); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java index d1c942ff..97cf2ccf 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java @@ -37,8 +37,6 @@ public interface DeviceMapper { "subscribeCycleForMobilePosition," + "mobilePositionSubmissionInterval," + "subscribeCycleForAlarm," + - "audioChannelForReceive," + - "audioChannelForSend," + "ssrcCheck," + "online" + ") VALUES (" + @@ -62,8 +60,6 @@ public interface DeviceMapper { "#{subscribeCycleForMobilePosition}," + "#{mobilePositionSubmissionInterval}," + "#{subscribeCycleForAlarm}," + - "#{audioChannelForReceive}," + - "#{audioChannelForSend}," + "#{ssrcCheck}," + "#{online}" + ")") @@ -90,8 +86,6 @@ public interface DeviceMapper { ", subscribeCycleForMobilePosition=${subscribeCycleForMobilePosition}" + ", mobilePositionSubmissionInterval=${mobilePositionSubmissionInterval}" + ", subscribeCycleForAlarm=${subscribeCycleForAlarm}" + - ", audioChannelForReceive=#{audioChannelForReceive}" + - ", audioChannelForSend=#{audioChannelForSend}" + ", ssrcCheck=${ssrcCheck}" + "WHERE deviceId='${deviceId}'"+ " "}) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/AudioBroadcastResult.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/AudioBroadcastResult.java new file mode 100644 index 00000000..a722ae89 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/AudioBroadcastResult.java @@ -0,0 +1,62 @@ +package com.genersoft.iot.vmp.vmanager.bean; + +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite; + +/** + * @author lin + */ +public class AudioBroadcastResult { + /** + * 推流的媒体节点信息 + */ + private MediaServerItemLite mediaServerItem; + + /** + * 编码格式 + */ + private String codec; + + /** + * 向zlm推流的应用名 + */ + private String app; + + /** + * 向zlm推流的流ID + */ + private String stream; + + + public MediaServerItemLite getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItemLite mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } + + public String getCodec() { + return codec; + } + + public void setCodec(String codec) { + this.codec = codec; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index dc88da0a..d587e0d0 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.service.IMediaService; @@ -39,6 +40,9 @@ import org.springframework.web.context.request.async.DeferredResult; import java.util.List; import java.util.UUID; +/** + * @author lin + */ @Api(tags = "国标设备点播") @CrossOrigin @RestController @@ -102,7 +106,7 @@ public class PlayController { logger.debug(String.format("设备预览/回放停止API调用,streamId:%s_%s", deviceId, channelId )); String uuid = UUID.randomUUID().toString(); - DeferredResult> result = new DeferredResult>(); + DeferredResult> result = new DeferredResult<>(); // 录像查询以channelId作为deviceId查询 String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId; @@ -123,7 +127,7 @@ public class PlayController { RequestMessage msgForSuccess = new RequestMessage(); msgForSuccess.setId(uuid); msgForSuccess.setKey(key); - msgForSuccess.setData(String.format("success")); + msgForSuccess.setData("success"); resultHolder.invokeAllResult(msgForSuccess); }); @@ -251,81 +255,73 @@ public class PlayController { @ApiOperation("语音广播命令") @ApiImplicitParams({ @ApiImplicitParam(name = "deviceId", value = "设备Id", dataTypeClass = String.class), - @ApiImplicitParam(name = "channelForSend", value = "设备用于发送语音数据的通道", dataTypeClass = String.class), - @ApiImplicitParam(name = "channelForReceive", value = "设备用于接收语音数据的通道", dataTypeClass = String.class), + @ApiImplicitParam(name = "channelId", value = "通道Id", dataTypeClass = String.class), + @ApiImplicitParam(name = "timeout", value = "推流超时时间(秒)", dataTypeClass = Integer.class), }) - @GetMapping("/broadcast/{deviceId}") - @PostMapping("/broadcast/{deviceId}") - public DeferredResult> broadcastApi(@PathVariable String deviceId, - String channelForSend, - String channelForReceive) { + @GetMapping("/broadcast/{deviceId}/{channelId}") + @PostMapping("/broadcast/{deviceId}/{channelId}") + public DeferredResult> broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout) { if (logger.isDebugEnabled()) { logger.debug("语音广播API调用"); } Device device = storager.queryVideoDevice(deviceId); - DeferredResult> result = new DeferredResult<>(3 * 1000L); - String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId; + if (device == null) { + WVPResult result = new WVPResult<>(); + result.setCode(-1); + result.setMsg("未找到设备: " + deviceId); + DeferredResult> deferredResult = new DeferredResult<>(); + deferredResult.setResult(result); + return deferredResult; + } + if (channelId == null) { + WVPResult result = new WVPResult<>(); + result.setCode(-1); + result.setMsg("未找到通道: " + channelId); + DeferredResult> deferredResult = new DeferredResult<>(); + deferredResult.setResult(result); + return deferredResult; + } + + String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId; if (resultHolder.exist(key, null)) { - result.setResult(new ResponseEntity<>("设备使用中",HttpStatus.OK)); - return result; + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(-1); + wvpResult.setMsg("设备使用中"); + DeferredResult> deferredResult = new DeferredResult<>(); + deferredResult.setResult(wvpResult); + return deferredResult; } - -// playService.audioBroadcast(deviceId, channelForSend, channelForReceive); - - - - - - + if (timeout == null){ + timeout = 30; + } + DeferredResult> result = new DeferredResult<>(timeout.longValue()*1000 + 2000); String uuid = UUID.randomUUID().toString(); - if (device == null) { - - resultHolder.put(key, key, result); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - JSONObject json = new JSONObject(); - json.put("DeviceID", deviceId); - json.put("CmdType", "Broadcast"); - json.put("Result", "Failed"); - json.put("Description", "Device 不存在"); - msg.setData(json); - resultHolder.invokeResult(msg); - return result; - } - cmder.audioBroadcastCmd(device, (event) -> { - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - JSONObject json = new JSONObject(); - json.put("DeviceID", deviceId); - json.put("CmdType", "Broadcast"); - json.put("Result", "Failed"); - json.put("Description", String.format("语音广播操作失败,错误码: %s, %s", event.statusCode, event.msg)); - msg.setData(json); - resultHolder.invokeResult(msg); + result.onTimeout(()->{ + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(-1); + wvpResult.setMsg("请求超时"); + RequestMessage requestMessage = new RequestMessage(); + requestMessage.setKey(key); + requestMessage.setData(wvpResult); + resultHolder.invokeAllResult(requestMessage); }); - - result.onTimeout(() -> { - logger.warn(String.format("语音广播操作超时, 设备未返回应答指令")); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - JSONObject json = new JSONObject(); - json.put("DeviceID", deviceId); - json.put("CmdType", "Broadcast"); - json.put("Result", "Failed"); - json.put("Error", "Timeout. Device did not response to broadcast command."); - msg.setData(json); - resultHolder.invokeResult(msg); + playService.audioBroadcast(device, channelId, timeout, (msg)->{ + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(-1); + wvpResult.setMsg(msg); + RequestMessage requestMessage = new RequestMessage(); + requestMessage.setKey(key); + requestMessage.setData(wvpResult); + resultHolder.invokeAllResult(requestMessage); }); resultHolder.put(key, uuid, result); + return result; } @ApiOperation("获取所有的ssrc") @GetMapping("/ssrc") - public WVPResult getSSRC() { + public WVPResult getSsrc() { if (logger.isDebugEnabled()) { logger.debug("获取所有的ssrc"); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java new file mode 100644 index 00000000..55b710f2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java @@ -0,0 +1,9 @@ +package com.genersoft.iot.vmp.vmanager.gb28181.play.bean; + + +/** + * @author lin + */ +public interface AudioBroadcastEvent { + void call(String msg); +} diff --git a/web_src/src/components/dialog/deviceEdit.vue b/web_src/src/components/dialog/deviceEdit.vue index 745a3e65..7e4b3b25 100644 --- a/web_src/src/components/dialog/deviceEdit.vue +++ b/web_src/src/components/dialog/deviceEdit.vue @@ -37,9 +37,6 @@ - - - @@ -105,6 +102,8 @@ export default { }) }, onSubmit: function () { + console.log("onSubmit"); + console.log(this.form); this.form.subscribeCycleForCatalog = this.form.subscribeCycleForCatalog||0 this.form.subscribeCycleForMobilePosition = this.form.subscribeCycleForMobilePosition||0 this.form.mobilePositionSubmissionInterval = this.form.mobilePositionSubmissionInterval||0 @@ -124,7 +123,7 @@ export default { }); } }).catch(function (error) { - console.error(error); + console.log(error); }); }, close: function () {