From ad5b66eb71a812dc5b2aa32f5264876458669cec Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 4 Sep 2024 18:01:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/bean/AudioBroadcastCatch.java | 82 +---- .../service/impl/DeviceServiceImpl.java | 2 +- .../service/impl/PlatformServiceImpl.java | 5 +- .../gb28181/service/impl/PlayServiceImpl.java | 10 +- .../session/AudioBroadcastManager.java | 70 +--- .../session/VideoStreamSessionManager.java | 2 +- .../cmd/ISIPCommanderForPlatform.java | 4 +- .../cmd/impl/SIPCommanderFroPlatform.java | 18 +- .../request/impl/ByeRequestProcessor.java | 6 +- .../request/impl/InviteRequestProcessor.java | 306 ++---------------- .../cmd/BroadcastNotifyMessageHandler.java | 2 +- .../cmd/BroadcastResponseMessageHandler.java | 2 +- .../media/service/IMediaServerService.java | 4 +- .../service/impl/MediaServerServiceImpl.java | 5 +- .../vmp/service/IReceiveRtpServerService.java | 4 + .../iot/vmp/service/bean/RTPServerParam.java | 19 ++ .../service/impl/RtpServerServiceImpl.java | 62 ++++ .../redisMsg/control/RedisRpcController.java | 34 -- 18 files changed, 161 insertions(+), 476 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java index bc65d951..e1c9c9c7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -1,20 +1,22 @@ package com.genersoft.iot.vmp.gb28181.bean; -import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.gb28181.controller.bean.AudioBroadcastEvent; +import com.genersoft.iot.vmp.media.bean.MediaServer; import gov.nist.javax.sip.message.SIPResponse; +import lombok.Data; /** * 缓存语音广播的状态 * @author lin */ +@Data public class AudioBroadcastCatch { public AudioBroadcastCatch( String deviceId, - String channelId, + Integer channelId, MediaServer mediaServerItem, String app, String stream, @@ -43,7 +45,7 @@ public class AudioBroadcastCatch { /** * 通道编号 */ - private String channelId; + private Integer channelId; /** * 流媒体信息 @@ -81,79 +83,7 @@ public class AudioBroadcastCatch { private AudioBroadcastEvent event; - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - - public String getChannelId() { - return channelId; - } - - public void setChannelId(String channelId) { - this.channelId = channelId; - } - - public AudioBroadcastCatchStatus getStatus() { - return status; - } - - public void setStatus(AudioBroadcastCatchStatus status) { - this.status = status; - } - - public SipTransactionInfo getSipTransactionInfo() { - return sipTransactionInfo; - } - - public MediaServer getMediaServerItem() { - return mediaServerItem; - } - - public void setMediaServerItem(MediaServer mediaServerItem) { - this.mediaServerItem = mediaServerItem; - } - - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public boolean isFromPlatform() { - return isFromPlatform; - } - - public void setFromPlatform(boolean fromPlatform) { - isFromPlatform = fromPlatform; - } - - public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) { - this.sipTransactionInfo = sipTransactionInfo; - } - - public AudioBroadcastEvent getEvent() { - return event; - } - - public void setEvent(AudioBroadcastEvent event) { - this.event = event; - } - - public void setSipTransactionInfoByRequset(SIPResponse sipResponse) { + public void setSipTransactionInfoByRequest(SIPResponse sipResponse) { this.sipTransactionInfo = new SipTransactionInfo(sipResponse); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 51db16a0..827ac240 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -235,7 +235,7 @@ public class DeviceServiceImpl implements IDeviceService { removeCatalogSubscribe(device, null); removeMobilePositionSubscribe(device, null); - List audioBroadcastCatches = audioBroadcastManager.get(deviceId); + List audioBroadcastCatches = audioBroadcastManager.getByDeviceId(deviceId); if (audioBroadcastCatches.size() > 0) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 1a4fd841..97c5d1b9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -112,7 +112,6 @@ public class PlatformServiceImpl implements IPlatformService { if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { String platformId = sendRtpItem.getPlatformId(); Platform platform = platformMapper.getParentPlatByServerGBId(platformId); - try { if (platform != null) { commanderForPlatform.streamByeCmd(platform, sendRtpItem); @@ -552,7 +551,7 @@ public class PlatformServiceImpl implements IPlatformService { log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc()); // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 try { - commanderForPlatform.streamByeCmd(platform, channel.getGbDeviceId(), ssrcInfo.getStream(), null, null); + commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getStream(), null, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); } finally { @@ -564,7 +563,7 @@ public class PlatformServiceImpl implements IPlatformService { } } }, userSetting.getPlayTimeout()); - commanderForPlatform.broadcastInviteCmd(platform, channel.getGbDeviceId(), mediaServerItem, ssrcInfo, (hookData)->{ + commanderForPlatform.broadcastInviteCmd(platform, channel, mediaServerItem, ssrcInfo, (hookData)->{ log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId()); dynamicTask.stop(timeOutTaskKey); // hook响应 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index b1d53f3b..48f0c5a8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -180,11 +180,11 @@ public class PlayServiceImpl implements IPlayService { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId()); if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null) { // 来自上级平台的停止对讲 log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getChannelId()); } } } @@ -1284,9 +1284,9 @@ public class PlayServiceImpl implements IPlayService { log.info("[停止对讲] 设备:{}, 通道:{}", deviceId, channelId); List audioBroadcastCatchList = new ArrayList<>(); if (channelId == null) { - audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId)); + audioBroadcastCatchList.addAll(audioBroadcastManager.getByDeviceId(deviceId)); } else { - audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId)); + audioBroadcastCatchList.add(audioBroadcastManager.getByDeviceId(deviceId, channelId)); } if (audioBroadcastCatchList.size() > 0) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) { @@ -1450,7 +1450,7 @@ public class PlayServiceImpl implements IPlayService { public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, Platform platform, CallIdHeader callIdHeader) { if (sendRtpItem.isOnlyAudio()) { Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getDeviceId()); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId()); if (audioBroadcastCatch != null) { try { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index c38f18ea..7dc9ca24 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -7,10 +7,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * 语音广播消息管理类 @@ -23,81 +24,42 @@ public class AudioBroadcastManager { @Autowired private SipConfig config; - public static Map data = new ConcurrentHashMap<>(); + public static Map data = new ConcurrentHashMap<>(); public void update(AudioBroadcastCatch audioBroadcastCatch) { - if (SipUtils.isFrontEnd(audioBroadcastCatch.getDeviceId())) { - audioBroadcastCatch.setChannelId(audioBroadcastCatch.getDeviceId()); - data.put(audioBroadcastCatch.getDeviceId(), audioBroadcastCatch); - }else { - data.put(audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId(), audioBroadcastCatch); - } + data.put(audioBroadcastCatch.getChannelId(), audioBroadcastCatch); } - public void del(String deviceId, String channelId) { - if (SipUtils.isFrontEnd(deviceId)) { - data.remove(deviceId); - }else { - data.remove(deviceId + channelId); - } + public void del(Integer channelId) { + data.remove(channelId); } - public void delByDeviceId(String deviceId) { - for (String key : data.keySet()) { - if (key.startsWith(deviceId)) { - data.remove(key); - } - } - } - public List getAll(){ Collection values = data.values(); return new ArrayList<>(values); } - public boolean exit(String deviceId, String channelId) { - for (String key : data.keySet()) { - if (SipUtils.isFrontEnd(deviceId)) { - return key.equals(deviceId); - }else { - return key.equals(deviceId + channelId); - } - } - return false; + public boolean exit(Integer channelId) { + return data.containsKey(channelId); } - public AudioBroadcastCatch get(String deviceId, String channelId) { - AudioBroadcastCatch audioBroadcastCatch; - if (SipUtils.isFrontEnd(deviceId)) { - audioBroadcastCatch = data.get(deviceId); - }else { - audioBroadcastCatch = data.get(deviceId + channelId); - } - if (audioBroadcastCatch == null) { - Stream allAudioBroadcastCatchStreamForDevice = data.values().stream().filter( - audioBroadcastCatchItem -> Objects.equals(audioBroadcastCatchItem.getDeviceId(), deviceId)); - List audioBroadcastCatchList = allAudioBroadcastCatchStreamForDevice.collect(Collectors.toList()); - if (audioBroadcastCatchList.size() == 1 && Objects.equals(config.getId(), channelId)) { - audioBroadcastCatch = audioBroadcastCatchList.get(0); - } - } - - return audioBroadcastCatch; + public AudioBroadcastCatch get(Integer channelId) { + return data.get(channelId); } - public List get(String deviceId) { + public List getByDeviceId(String deviceId) { List audioBroadcastCatchList= new ArrayList<>(); if (SipUtils.isFrontEnd(deviceId)) { if (data.get(deviceId) != null) { audioBroadcastCatchList.add(data.get(deviceId)); } }else { - for (String key : data.keySet()) { - if (key.startsWith(deviceId)) { - audioBroadcastCatchList.add(data.get(key)); + for (AudioBroadcastCatch broadcastCatch : data.values()) { + if (broadcastCatch.getDeviceId().equals(deviceId)) { + audioBroadcastCatchList.add(broadcastCatch); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index 86f4bf41..9fe32f59 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -38,7 +38,7 @@ public class VideoStreamSessionManager { * @param mediaServerId 所使用的流媒体ID * @param response 回复 */ - public void put(String deviceId, String channelId, String callId, String stream, String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type){ + public void put(String deviceId, Integer channelId, String callId, String stream, String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type){ SsrcTransaction ssrcTransaction = new SsrcTransaction(); ssrcTransaction.setDeviceId(deviceId); ssrcTransaction.setChannelId(channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 7eefe8eb..6e9666e7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -152,9 +152,9 @@ public interface ISIPCommanderForPlatform { void streamByeCmd(Platform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException; - void streamByeCmd(Platform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; + void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; - void broadcastInviteCmd(Platform platform, String channelId, MediaServer mediaServerItem, + void broadcastInviteCmd(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem, SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 01a776ad..4b4d47ac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -656,17 +656,17 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public void streamByeCmd(Platform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(platform.getServerGBId(), channelId, callId, stream); + public void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(channel.getGbId(), callId, stream); if (ssrcTransaction == null) { - throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channelId, callId, stream); + throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channel.getGbDeviceId(), callId, stream); } mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); - Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channelId, ssrcTransaction.getSipTransactionInfo()); + Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo()); sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), byteRequest, null, okEvent); } @@ -694,7 +694,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public void broadcastInviteCmd(Platform platform, String channelId, MediaServer mediaServerItem, + public void broadcastInviteCmd(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem, SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException { String stream = ssrcInfo.getStream(); @@ -715,7 +715,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); + content.append("o=" + channel.getGbDeviceId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 " + sdpIp + "\r\n"); content.append("t=0 0\r\n"); @@ -742,18 +742,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getTransport()); - Request request = headerProviderPlatformProvider.createInviteRequest(platform, channelId, + Request request = headerProviderPlatformProvider.createInviteRequest(platform, channel.getGbDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), ssrcInfo.getSsrc(), callIdHeader); sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { - streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + streamSession.remove(platform.getServerGBId(), channel.getGbId(), ssrcInfo.getStream()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); subscribe.removeSubscribe(hook); errorEvent.response(e); }), e -> { ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); - streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST); + streamSession.put(platform.getServerGBId(), channel.getGbId(), callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST); okEvent.response(e); }); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 501a39fa..601d20ae 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -152,11 +152,11 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In } MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServer != null) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { // 来自上级平台的停止对讲 log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getChannelId()); } MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId); @@ -231,7 +231,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In case TALK: // 查找来源的对讲设备,发送停止 Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(ssrcTransaction.getDeviceId(), channel.getDeviceId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(ssrcTransaction.getDeviceId(), channel.getDeviceId()); if (sourceDevice != null) { playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channel.getDeviceId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index c6d69beb..964fe009 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; @@ -9,10 +7,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; -import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; -import com.genersoft.iot.vmp.gb28181.service.IPlayService; +import com.genersoft.iot.vmp.gb28181.service.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; @@ -21,15 +16,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -56,8 +47,6 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; -import java.util.Map; -import java.util.Random; import java.util.Vector; /** @@ -76,6 +65,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IVideoManagerStorage storager; + @Autowired + private IDeviceChannelService deviceChannelService; + @Autowired private IGbChannelService channelService; @@ -193,7 +185,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(streamInfo.getMediaServer(), inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(), streamInfo.getApp(), streamInfo.getStream(), - channel.getGbDeviceId(), inviteInfo.isTcp(), platform.isRtcp()); + channel.getGbId(), inviteInfo.isTcp(), platform.isRtcp()); if (inviteInfo.isTcp() && inviteInfo.isTcpActive()) { sendRtpItem.setTcpActive(true); } @@ -744,263 +736,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, Platform parentPlatform, - JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { - if (jsonObject == null) { - log.error("下级TCP被动启动监听失败: 请检查ZLM服务"); - } else if (jsonObject.getInteger("code") == 0) { - log.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject); - log.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); - } else { - log.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); - } - } - - /** - * 安排推流 - */ - private void sendProxyStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, Platform platform, SIPRequest request) { - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); - - if (mediaInfo != null) { - - // 自平台内容 - int localPort = sendRtpPortManager.getNextPort(mediaServerItem); - if (localPort == 0) { - log.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - } - return; - } - sendRtpItem.setPlayType(InviteStreamType.PROXY); - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); - - SIPResponse response = sendStreamAck(request, sendRtpItem, platform); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } - } - - private void sendPushStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, Platform platform, SIPRequest request) { - // 推流 - if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); - if (mediaInfo != null ) { - // 自平台内容 - int localPort = sendRtpPortManager.getNextPort(mediaServerItem); - if (localPort == 0) { - log.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - } - return; - } - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - SIPResponse response = sendStreamAck(request, sendRtpItem, platform); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - if (sendRtpItem.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - sendRtpItem.setSsrc(ssrc); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - // 不在线 拉起 - notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); - } - } else { - // 其他平台内容 - otherWvpPushStream(sendRtpItem, request, platform); - } - } - - /** - * 通知流上线 - */ - private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, Platform platform, SIPRequest request) { - // TODO 控制启用以使设备上线 - log.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); - // 监听流上线 - Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), mediaServerItem.getId()); - hookSubscribe.addSubscribe(hook, (hookData)->{ - log.info("[上级点播]拉流代理已经就绪, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - dynamicTask.stop(sendRtpItem.getCallId()); - sendProxyStream(sendRtpItem, mediaServerItem, platform, request); - }); - dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { - log.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); - hookSubscribe.removeSubscribe(hook); - }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.startByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); - if (!start) { - try { - responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); - } - hookSubscribe.removeSubscribe(hook); - dynamicTask.stop(sendRtpItem.getCallId()); - } - } - - /** - * 通知流上线 - */ - private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, Platform platform, SIPRequest request) { - // 发送redis消息以使设备上线,流上线后被 - log.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), - platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - // 设置超时 - dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { - redisRpcService.stopWaitePushStreamOnline(sendRtpItem); - log.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); - try { - responseAck(request, Response.REQUEST_TIMEOUT); // 超时 - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } - }, userSetting.getPlatformPlayTimeout()); - // - long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { - dynamicTask.stop(sendRtpItem.getCallId()); - if (sendRtpItemKey == null) { - log.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } - return; - } - SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); - if (sendRtpItemFromRedis == null) { - log.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } - return; - } - if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { - log.info("[级联点播] 等待的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - int localPort = sendRtpPortManager.getNextPort(mediaServerItem); - if (localPort == 0) { - log.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } - return; - } - sendRtpItem.setLocalPort(localPort); - if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { - sendRtpItem.setLocalIp(platform.getSendStreamIp()); - } - - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - SIPResponse response = sendStreamAck(request, sendRtpItem, platform); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - // 其他平台内容 - otherWvpPushStream(sendRtpItemFromRedis, request, platform); - } - }); - // 添加回复的拒绝或者错误的通知 - // redis消息例如: PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"失败","app":"1","stream":"2"}' - redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(sendRtpItem.getCallId()); - redisRpcService.stopWaitePushStreamOnline(sendRtpItem); - redisRpcService.removeCallback(key); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage()); - } - } - }); - } - - - - /** - * 来自其他wvp的推流 - */ - private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, Platform platform) { - log.info("[级联点播] 来自其他wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey()); - if (sendRtpItem == null) { - return; - } - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - SIPResponse response = sendStreamAck(request, sendRtpItem, platform); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } - - public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, Platform platform) { - - String sdpIp = sendRtpItem.getLocalIp(); - if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { - sdpIp = platform.getSendStreamIp(); - } - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); - content.append("s=Play\r\n"); - content.append("c=IN IP4 " + sdpIp + "\r\n"); - content.append("t=0 0\r\n"); - // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口 - int localPort = sendRtpItem.getLocalPort(); - if (localPort == 0) { - localPort = new Random().nextInt(65535) + 1; - } - content.append("m=video " + localPort + " RTP/AVP 96\r\n"); - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - if (sendRtpItem.isTcp()) { - content.append("a=connection:new\r\n"); - if (!sendRtpItem.isTcpActive()) { - content.append("a=setup:active\r\n"); - } else { - content.append("a=setup:passive\r\n"); - } - } - content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); - content.append("f=\r\n"); - - try { - return responseSdpAck(request, content.toString(), platform); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } - return null; - } - public void inviteFromDeviceHandle(SIPRequest request, String requesterId, String channelId) { String realChannelId = null; @@ -1032,8 +767,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - - AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), realChannelId); + DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), realChannelId); + if (deviceChannel == null) { + log.warn("来自设备的Invite请求,无法从请求信息中确定所属通道,已忽略,requesterId: {}/{}", requesterId, realChannelId); + try { + responseAck(request, Response.FORBIDDEN); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 来自设备的Invite请求,无法从请求信息中确定所属设备 FORBIDDEN: {}", e.getMessage()); + } + return; + } + AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(deviceChannel.getId()); if (broadcastCatch == null) { log.warn("来自设备的Invite请求非语音广播,已忽略,requesterId: {}/{}", requesterId, channelId); try { @@ -1054,7 +798,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.TRYING); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); - playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId()); return; } String contentString = new String(request.getRawContent()); @@ -1099,7 +843,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); - playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId()); return; } return; @@ -1115,7 +859,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); - playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId()); } return; } @@ -1124,7 +868,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId, - device.getDeviceId(), broadcastCatch.getChannelId(), + device.getDeviceId(), deviceChannel.getId(), mediaTransmissionTCP, false); if (sendRtpItem == null) { @@ -1133,7 +877,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId()); return; } return; @@ -1168,11 +912,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements log.error("[命令发送失败] 语音通话 回复410失败, {}", e.getMessage()); return; } - playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId()); } } catch (SdpException e) { log.error("[SDP解析异常]", e); - playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); + playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId()); } } else { log.warn("来自无效设备/平台的请求"); @@ -1224,10 +968,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sipResponse = responseSdpAck(request, content.toString(), parentPlatform); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), sendRtpItem.getChannelId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId()); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); - audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); + audioBroadcastCatch.setSipTransactionInfoByRequest(sipResponse); audioBroadcastManager.update(audioBroadcastCatch); streamSession.put(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST); // 开启发流,大华在收到200OK后就会开始建立连接 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index 76d9ac4b..81052652 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -127,7 +127,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp try { platformService.broadcastInvite(platform, channel, mediaServerForMinimumLoad, (hookData)->{ // 上级平台推流成功 - AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); + AudioBroadcastCatch broadcastCatch = audioBroadcastManager.getByDeviceId(device.getDeviceId(), targetId); if (broadcastCatch != null ) { if (playService.audioBroadcastInUse(device, targetId)) { log.info("[国标级联] 语音喊话 设备正在使用中 platform: {}, channel: {}", diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index ea65db4a..f639c075 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -70,7 +70,7 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i // 回复200 OK responseAck(request, Response.OK); if (result.equalsIgnoreCase("OK")) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(device.getDeviceId(), channelId); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); audioBroadcastManager.update(audioBroadcastCatch); }else { diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 1a743b2f..48b81bc8 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -146,10 +146,10 @@ public interface IMediaServerService { void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem); - SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp); + SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean mediaTransmissionTCP, boolean rtcp); SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, - String app, String stream, String channelId, boolean tcp, boolean rtcp); + String app, String stream, Integer channelId, boolean tcp, boolean rtcp); MediaServer getMediaServerByAppAndStream(String app, String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 7c56b27d..1efeca7a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -852,7 +852,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) { + public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean isTcp, boolean rtcp) { int localPort = sendRtpPortManager.getNextPort(mediaServer); if (localPort == 0) { return null; @@ -861,7 +861,6 @@ public class MediaServerServiceImpl implements IMediaServerService { sendRtpItem.setIp(ip); sendRtpItem.setPort(port); sendRtpItem.setSsrc(ssrc); - sendRtpItem.setPlatformId(deviceId); sendRtpItem.setDeviceId(deviceId); sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(isTcp); @@ -875,7 +874,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, - String app, String stream, String channelId, boolean tcp, boolean rtcp){ + String app, String stream, Integer channelId, boolean tcp, boolean rtcp){ int localPort = sendRtpPortManager.getNextPort(serverItem); if (localPort <= 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java new file mode 100644 index 00000000..ad7cbbef --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java @@ -0,0 +1,4 @@ +package com.genersoft.iot.vmp.service; + +public interface IReceiveRtpServerService { +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java new file mode 100644 index 00000000..54839e89 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java @@ -0,0 +1,19 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.genersoft.iot.vmp.media.bean.MediaServer; +import lombok.Data; + +@Data +public class RTPServerParam { + + MediaServer mediaServerItem; + String streamId; + String presetSsrc; + boolean ssrcCheck; + boolean isPlayback; + Integer port; + Boolean onlyAuto; + Boolean disableAudio; + Boolean reUsePort; + Integer tcpMode; +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java new file mode 100644 index 00000000..42d65f39 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -0,0 +1,62 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.RTPServerParam; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; +import java.util.List; + +@Service +public class RtpServerServiceImpl implements IReceiveRtpServerService { + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private DynamicTask dynamicTask; + + /** + * 流到来的处理 + */ + @Async("taskExecutor") + @org.springframework.context.event.EventListener + public void onApplicationEvent(MediaArrivalEvent event) { + + } + + /** + * 流离开的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + + } + + @Override + public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback) { + // 开启流到来的监听 + // 设置流超时的定时任务 + // 调用节点,开启端口监听 + + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 43b8bf3a..c7cdda6b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; -import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; @@ -28,10 +27,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; - /** * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用 */ @@ -303,35 +298,6 @@ public class RedisRpcController { return response; } - /** - * 其他wvp通知推流已经停止了 - */ - public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) { - String sendRtpItemKey = request.getParam().toString(); - SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); - if (sendRtpItem == null) { - log.info("[redis-rpc] 推流已经停止, 未找到redis中的发流信息, key:{}", sendRtpItemKey); - return response; - } - log.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); - String platformId = sendRtpItem.getPlatformId(); - Platform platform = platformService.queryPlatformByServerGBId(platformId); - if (platform == null) { - return response; - } - try { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 发送BYE: {}", e.getMessage()); - } - return response; - } - private void sendResponse(RedisRpcResponse response){ log.info("[redis-rpc] >> {}", response); response.setToId(userSetting.getServerId());