修复当拉流代理上线很慢时,收到来自上级的 ACK 早于流注册导致推流失败的问题

pull/1747/head
Li Keqing 2025-01-03 19:32:49 +08:00
parent 5cf5c506b1
commit 469717523b
3 changed files with 56 additions and 9 deletions

View File

@ -129,12 +129,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
public void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback){ public void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback){
// 拉流代理通道 // 拉流代理通道
try { try {
StreamInfo streamInfo = streamProxyPlayService.start(channel.getDataDeviceId()); streamProxyPlayService.start(channel.getDataDeviceId(), callback);
if (streamInfo == null) {
callback.run(Response.BUSY_HERE, "busy here", null);
}else {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
}catch (Exception e) { }catch (Exception e) {
callback.run(Response.BUSY_HERE, "busy here", null); callback.run(Response.BUSY_HERE, "busy here", null);
} }

View File

@ -1,11 +1,12 @@
package com.genersoft.iot.vmp.streamProxy.service; package com.genersoft.iot.vmp.streamProxy.service;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
public interface IStreamProxyPlayService { public interface IStreamProxyPlayService {
StreamInfo start(int id); void start(int id, ErrorCallback<StreamInfo> callback);
StreamInfo startProxy(StreamProxy streamProxy); StreamInfo startProxy(StreamProxy streamProxy);

View File

@ -3,15 +3,27 @@ package com.genersoft.iot.vmp.streamProxy.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import javax.sip.message.Response;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
@ -29,13 +41,52 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
private ConcurrentHashMap<Integer, ErrorCallback<StreamInfo>> callbackMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer, StreamInfo> streamInfoMap = new ConcurrentHashMap<>();
/**
*
*/
@Async("taskExecutor")
@Transactional
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("rtsp".equals(event.getSchema())) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(event.getApp(), event.getStream());
if (streamProxy != null) {
ErrorCallback<StreamInfo> callback = callbackMap.remove(streamProxy.getId());
StreamInfo streamInfo = streamInfoMap.remove(streamProxy.getId());
if (callback != null && streamInfo != null) {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
}
}
@Override @Override
public StreamInfo start(int id) { public void start(int id, ErrorCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.select(id); StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) { if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
} }
return startProxy(streamProxy); StreamInfo streamInfo = startProxy(streamProxy);
if (streamInfo == null) {
callback.run(Response.BUSY_HERE, "busy here", null);
return;
}
callbackMap.put(id, callback);
streamInfoMap.put(id, streamInfo);
MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
if (mediaServer != null) {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream());
if (mediaInfo != null) {
callbackMap.remove(id);
streamInfoMap.remove(id);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
} }
@Override @Override