调整上级观看消息的发送

pull/1489/head
648540858 2024-05-29 15:02:51 +08:00
parent bec35ebf94
commit 764d04b497
11 changed files with 43 additions and 64 deletions

View File

@ -13,7 +13,6 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@ -108,19 +107,16 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem);
if (wvpResult.getCode() == 0) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, parentPlatform);
}
} else {
try {
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null);
mediaServerService.startSendRtpPassive(mediaInfo,sendRtpItem, null);
} else {
mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
}
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, parentPlatform);
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());
playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
@ -142,9 +138,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
}
try {
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null);
mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null);
} else {
mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());

View File

@ -466,7 +466,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (sendRtpItem.isTcpActive()) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
try {
mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5);
mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5);
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform);
}catch (ControllerException e) {}
}
} catch (SipException | InvalidArgumentException | ParseException e) {

View File

@ -152,7 +152,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}else {
// 发流
try {
mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem);
mediaServerService.startSendRtp(hookData.getMediaServer(), sendRtpItem);
}catch (ControllerException e) {
logger.info("[语音喊话] 推流失败, 结果: {}", e.getMessage());
return;

View File

@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.media.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -141,9 +140,9 @@ public interface IMediaServerService {
Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId);
void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout);
void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout);
void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem);
SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp);

View File

@ -7,8 +7,6 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
@ -24,7 +22,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
@ -827,18 +824,17 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout) {
public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
logger.info("[startSendRtpPassive] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout);
sendPlatformStartPlayMsg(platform, sendRtpItem);
}
@Override
public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
public void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
logger.info("[startSendRtpStream] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
@ -847,21 +843,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
logger.info("[开始推流] rtp/{}, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
if (platform != null) {
sendPlatformStartPlayMsg(platform, sendRtpItem);
}
}
private void sendPlatformStartPlayMsg(ParentPlatform platform, SendRtpItem sendRtpItem) {
if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform != null) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(platform.getId());
redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
}
}
@Override

View File

@ -383,7 +383,7 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout());
try {
mediaServerService.startSendRtpPassive(mediaServerItem, null, sendRtpItem, userSetting.getPlayTimeout() * 1000);
mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpItem, userSetting.getPlayTimeout() * 1000);
}catch (ControllerException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
@ -1412,10 +1412,11 @@ public class PlayServiceImpl implements IPlayService {
if (mediaInfo != null) {
try {
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null);
} else {
mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
}
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform);
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());
startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);

View File

@ -209,7 +209,7 @@ public class RedisRpcController {
return response;
}
try {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
mediaServerService.startSendRtp(mediaServer, sendRtpItem);
}catch (ControllerException exception) {
logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}{} {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg());

View File

@ -208,7 +208,7 @@ public interface IRedisCatchStorage {
void sendChannelAddOrDelete(String deviceId, String channelId, boolean add);
void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);

View File

@ -656,10 +656,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
public void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform != null) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(platform.getId());
String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
}
}
@Override

View File

@ -1,19 +1,17 @@
package com.genersoft.iot.vmp.vmanager.ps;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -210,7 +208,7 @@ public class PsController {
SendRtpItem sendRtpItem = SendRtpItem.getInstance(app, stream, ssrc, dstIp, dstPort, !isUdp, sendInfo.getSendLocalPort(), null);
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
if (streamReady) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
mediaServerService.startSendRtp(mediaServer, sendRtpItem);
logger.info("[第三方PS服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, sendRtpItem);
redisTemplate.opsForValue().set(key, sendInfo);
}else {
@ -235,7 +233,7 @@ public class PsController {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
mediaServerService.startSendRtp(mediaServer, sendRtpItem);
logger.info("[第三方PS服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, sendRtpItem);
redisTemplate.opsForValue().set(key, finalSendInfo);
hookSubscribe.removeSubscribe(hook);

View File

@ -1,18 +1,17 @@
package com.genersoft.iot.vmp.vmanager.rtp;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -31,9 +30,7 @@ import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -247,12 +244,12 @@ public class RtpController {
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
if (streamReady) {
if (sendRtpItemForVideo != null) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForVideo);
mediaServerService.startSendRtp(mediaServer, sendRtpItemForVideo);
logger.info("[第三方服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, sendRtpItemForVideo);
redisTemplate.opsForValue().set(key, sendInfo);
}
if(sendRtpItemForAudio != null) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForAudio);
mediaServerService.startSendRtp(mediaServer, sendRtpItemForAudio);
logger.info("[第三方服务对接->发送流] 音频流发流成功callId->{}param->{}", callId, sendRtpItemForAudio);
redisTemplate.opsForValue().set(key, sendInfo);
}
@ -279,12 +276,12 @@ public class RtpController {
throw new RuntimeException(e);
}
if (sendRtpItemForVideo != null) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForVideo);
mediaServerService.startSendRtp(mediaServer, sendRtpItemForVideo);
logger.info("[第三方服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, sendRtpItemForVideo);
redisTemplate.opsForValue().set(key, finalSendInfo);
}
if(sendRtpItemForAudio != null) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForAudio);
mediaServerService.startSendRtp(mediaServer, sendRtpItemForAudio);
logger.info("[第三方服务对接->发送流] 音频流发流成功callId->{}param->{}", callId, sendRtpItemForAudio);
redisTemplate.opsForValue().set(key, finalSendInfo);
}