diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 3bb6842b..d4df0ae5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -13,7 +13,6 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -108,19 +107,16 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem); if (wvpResult.getCode() == 0) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(), - sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); - redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel); + redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, parentPlatform); } } else { try { if (sendRtpItem.isTcpActive()) { - mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null); + mediaServerService.startSendRtpPassive(mediaInfo,sendRtpItem, null); } else { - mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem); + mediaServerService.startSendRtp(mediaInfo, sendRtpItem); } + redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, parentPlatform); }catch (ControllerException e) { logger.error("RTP推流失败: {}", e.getMessage()); playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader); @@ -142,9 +138,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In } try { if (sendRtpItem.isTcpActive()) { - mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null); + mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null); } else { - mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem); + mediaServerService.startSendRtp(mediaInfo, sendRtpItem); } }catch (ControllerException e) { logger.error("RTP推流失败: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index e3a25ec8..de1a9290 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -466,7 +466,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (sendRtpItem.isTcpActive()) { MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); try { - mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5); + mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5); + redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform); }catch (ControllerException e) {} } } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index 46f9642c..bb1b3792 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -152,7 +152,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp }else { // 发流 try { - mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem); + mediaServerService.startSendRtp(hookData.getMediaServer(), sendRtpItem); }catch (ControllerException e) { logger.info("[语音喊话] 推流失败, 结果: {}", e.getMessage()); return; diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index ceac0ec8..a8c53381 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.media.service; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -141,9 +140,9 @@ public interface IMediaServerService { Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId); - void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout); + void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout); - void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem); + void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem); SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 2bfc5df4..a0b33413 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -7,8 +7,6 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; @@ -24,7 +22,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; @@ -827,18 +824,17 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout) { + public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { logger.info("[startSendRtpPassive] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类"); } mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout); - sendPlatformStartPlayMsg(platform, sendRtpItem); } @Override - public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) { + public void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { logger.info("[startSendRtpStream] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); @@ -847,21 +843,6 @@ public class MediaServerServiceImpl implements IMediaServerService { logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem); - if (platform != null) { - sendPlatformStartPlayMsg(platform, sendRtpItem); - } - - - } - - private void sendPlatformStartPlayMsg(ParentPlatform platform, SendRtpItem sendRtpItem) { - if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform != null) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(), - sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(platform.getId()); - redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel); - } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 5375b7aa..43b4d66c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -383,7 +383,7 @@ public class PlayServiceImpl implements IPlayService { }, userSetting.getPlayTimeout()); try { - mediaServerService.startSendRtpPassive(mediaServerItem, null, sendRtpItem, userSetting.getPlayTimeout() * 1000); + mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpItem, userSetting.getPlayTimeout() * 1000); }catch (ControllerException e) { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId); @@ -1412,10 +1412,11 @@ public class PlayServiceImpl implements IPlayService { if (mediaInfo != null) { try { if (sendRtpItem.isTcpActive()) { - mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null); + mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null); } else { - mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem); + mediaServerService.startSendRtp(mediaInfo, sendRtpItem); } + redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform); }catch (ControllerException e) { logger.error("RTP推流失败: {}", e.getMessage()); startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 4e12670e..adb23186 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -209,7 +209,7 @@ public class RedisRpcController { return response; } try { - mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); + mediaServerService.startSendRtp(mediaServer, sendRtpItem); }catch (ControllerException exception) { logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg()); WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg()); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index d87c3423..2347f502 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -208,7 +208,7 @@ public interface IRedisCatchStorage { void sendChannelAddOrDelete(String deviceId, String channelId, boolean add); - void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel); + void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform); void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index a7c5f72c..b743cfc6 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -656,10 +656,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public void sendPlatformStartPlayMsg(MessageForPushChannel msg) { - String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY; - logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); - redisTemplate.convertAndSend(key, JSON.toJSON(msg)); + public void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) { + if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform != null) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), + sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(), + sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(platform.getId()); + String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY; + logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId()); + redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); + } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java index 74e155b4..a587d5df 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -1,19 +1,17 @@ package com.genersoft.iot.vmp.vmanager.ps; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.event.hook.Hook; -import com.genersoft.iot.vmp.media.event.hook.HookType; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -210,7 +208,7 @@ public class PsController { SendRtpItem sendRtpItem = SendRtpItem.getInstance(app, stream, ssrc, dstIp, dstPort, !isUdp, sendInfo.getSendLocalPort(), null); Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream); if (streamReady) { - mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); + mediaServerService.startSendRtp(mediaServer, sendRtpItem); logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, sendRtpItem); redisTemplate.opsForValue().set(key, sendInfo); }else { @@ -235,7 +233,7 @@ public class PsController { } catch (InterruptedException e) { throw new RuntimeException(e); } - mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); + mediaServerService.startSendRtp(mediaServer, sendRtpItem); logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, sendRtpItem); redisTemplate.opsForValue().set(key, finalSendInfo); hookSubscribe.removeSubscribe(hook); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index e08f769a..7832e373 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -1,18 +1,17 @@ package com.genersoft.iot.vmp.vmanager.rtp; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.event.hook.Hook; -import com.genersoft.iot.vmp.media.event.hook.HookType; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -31,9 +30,7 @@ import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -247,12 +244,12 @@ public class RtpController { Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream); if (streamReady) { if (sendRtpItemForVideo != null) { - mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForVideo); + mediaServerService.startSendRtp(mediaServer, sendRtpItemForVideo); logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, sendRtpItemForVideo); redisTemplate.opsForValue().set(key, sendInfo); } if(sendRtpItemForAudio != null) { - mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForAudio); + mediaServerService.startSendRtp(mediaServer, sendRtpItemForAudio); logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, sendRtpItemForAudio); redisTemplate.opsForValue().set(key, sendInfo); } @@ -279,12 +276,12 @@ public class RtpController { throw new RuntimeException(e); } if (sendRtpItemForVideo != null) { - mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForVideo); + mediaServerService.startSendRtp(mediaServer, sendRtpItemForVideo); logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, sendRtpItemForVideo); redisTemplate.opsForValue().set(key, finalSendInfo); } if(sendRtpItemForAudio != null) { - mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForAudio); + mediaServerService.startSendRtp(mediaServer, sendRtpItemForAudio); logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, sendRtpItemForAudio); redisTemplate.opsForValue().set(key, finalSendInfo); }