From 100252a253263321873e79d43dff94e19defe353 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 22 Feb 2023 18:06:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E8=AF=AD=E9=9F=B3=E5=AF=B9?= =?UTF-8?q?=E8=AE=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/redis/RedisConfig.java | 3 +- .../vmp/gb28181/bean/AudioBroadcastCatch.java | 50 ++++++++++++++++++- .../request/impl/InviteRequestProcessor.java | 40 ++++++--------- .../vmp/media/zlm/ZLMHttpHookListener.java | 13 +++-- .../iot/vmp/service/IPlayService.java | 4 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 6 +-- 6 files changed, 74 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java index 1eca1319..2107c325 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.conf.redis; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.service.redisMsg.*; +import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.context.annotation.Bean; @@ -13,8 +14,6 @@ import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.StringRedisSerializer; -import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer; - /** * @description:Redis中间件配置类,使用spring-data-redis集成,自动从application.yml中加载redis配置 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java index 88db8071..bcf320d6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import gov.nist.javax.sip.message.SIPResponse; /** @@ -10,10 +11,18 @@ import gov.nist.javax.sip.message.SIPResponse; public class AudioBroadcastCatch { - public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status) { + public AudioBroadcastCatch(String deviceId, + String channelId, + AudioBroadcastCatchStatus status, + MediaServerItem mediaServerItem, + String app, + String stream) { this.deviceId = deviceId; this.channelId = channelId; this.status = status; + this.mediaServerItem = mediaServerItem; + this.app = app; + this.stream = stream; } public AudioBroadcastCatch() { @@ -29,6 +38,21 @@ public class AudioBroadcastCatch { */ private String channelId; + /** + * 使用的流媒体 + */ + private MediaServerItem mediaServerItem; + + /** + * 待推送给设备的流应用名 + */ + private String app; + + /** + * 待推送给设备的流ID + */ + private String stream; + /** * 语音广播状态 */ @@ -68,6 +92,30 @@ public class AudioBroadcastCatch { return sipTransactionInfo; } + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) { this.sipTransactionInfo = sipTransactionInfo; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 56cadb34..b35fc303 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -903,8 +903,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId); - if (audioBroadcastCatch == null) { + AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(requesterId, channelId); + if (broadcastCatch == null) { logger.warn("来自设备的Invite请求非语音广播,已忽略,requesterId: {}/{}", requesterId, channelId); try { responseAck(request, Response.FORBIDDEN); @@ -915,13 +915,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + audioBroadcastCatch.getChannelId(); + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + broadcastCatch.getChannelId(); dynamicTask.stop(key); try { responseAck(request, Response.TRYING); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); - playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); return; } String contentString = new String(request.getRawContent()); @@ -977,7 +977,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); - playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); return; } return; @@ -986,19 +986,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc, mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP"); - MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); - if (mediaServerItem == null) { - logger.warn("未找到可用的zlm"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); - playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); - } - return; - } + MediaServerItem mediaServerItem = broadcastCatch.getMediaServerItem(); SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), audioBroadcastCatch.getChannelId(), + device.getDeviceId(), broadcastCatch.getChannelId(), mediaTransmissionTCP, false); if (sendRtpItem == null) { @@ -1007,22 +997,20 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); return; } return; } - String app = "broadcast"; - String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); sendRtpItem.setPlayType(InviteStreamType.TALK); sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlatformId(requesterId); sendRtpItem.setStatus(1); - sendRtpItem.setApp(app); - sendRtpItem.setStreamId(stream); + sendRtpItem.setApp(broadcastCatch.getApp()); + sendRtpItem.setStreamId(broadcastCatch.getStream()); sendRtpItem.setPt(8); sendRtpItem.setUsePs(false); sendRtpItem.setRtcp(false); @@ -1034,22 +1022,22 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements redisCatchStorage.updateSendRTPSever(sendRtpItem); - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream()); if (streamReady) { sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc); }else { - logger.warn("[语音通话], 未发现待推送的流,app={},stream={}", app, stream); + logger.warn("[语音通话], 未发现待推送的流,app={},stream={}", broadcastCatch.getApp(), broadcastCatch.getStream()); try { responseAck(request, Response.GONE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 语音通话 回复410失败, {}", e.getMessage()); return; } - playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); } } catch (SdpException e) { logger.error("[SDP解析异常]", e); - playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); } } else { logger.warn("来自无效设备/平台的请求"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index dae42c75..8770d0e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -274,18 +274,17 @@ public class ZLMHttpHookListener { logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); } - + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); if (subscribe != null) { - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (mediaInfo != null) { subscribe.response(mediaInfo, json); } } // 流消失移除redis play - List tracks = param.getTracks(); if (param.isRegist()) { if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() @@ -343,7 +342,7 @@ public class ZLMHttpHookListener { } // 开启语音对讲通道 try { - playService.audioBroadcastCmd(device, channelId, 60, (msg)->{ + playService.audioBroadcastCmd(device, channelId, 60, mediaInfo, param.getApp(), param.getStream(), (msg)->{ logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); }); } catch (InvalidArgumentException | ParseException | SipException e) { @@ -375,7 +374,7 @@ public class ZLMHttpHookListener { if (sendRtpItem == null) { // TODO 可能数据错误,重新开启语音通道 }else { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); Map sendParam = new HashMap<>(12); sendParam.put("vhost","__defaultVhost__"); @@ -389,12 +388,12 @@ public class ZLMHttpHookListener { JSONObject jsonObject; if (sendRtpItem.isTcpActive()) { - jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, sendParam); + jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaServerItem, sendParam); } else { sendParam.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); sendParam.put("dst_url", sendRtpItem.getIp()); sendParam.put("dst_port", sendRtpItem.getPort()); - jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, sendParam); + jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItem, sendParam); } if (jsonObject != null && jsonObject.getInteger("code") == 0) { logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index ffdaef3e..6d11afa9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -12,9 +12,7 @@ import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import gov.nist.javax.sip.message.SIPResponse; -import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -62,7 +60,7 @@ public interface IPlayService { AudioBroadcastResult audioBroadcast(Device device, String channelId); void stopAudioBroadcast(String deviceId, String channelId); - void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; + void audioBroadcastCmd(Device device, String channelId, int timeout, MediaServerItem mediaServerItem, String sourceApp, String sourceStream, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 56efa72a..4336664f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1011,7 +1011,7 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { + public void audioBroadcastCmd(Device device, String channelId, int timeout, MediaServerItem mediaServerItem, String sourceApp, String sourceStream, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { if (device == null || channelId == null) { return; } @@ -1027,7 +1027,6 @@ public class PlayServiceImpl implements IPlayService { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 - MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId()); if (streamReady) { logger.warn("语音广播已经开启: {}", channelId); @@ -1042,7 +1041,8 @@ public class PlayServiceImpl implements IPlayService { // 发送通知 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { // 发送成功 - AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); + AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, + AudioBroadcastCatchStatus.Ready, mediaServerItem, sourceApp, sourceStream); audioBroadcastManager.update(audioBroadcastCatch); }, eventResultForError -> { // 发送失败