优化拉流代理接口哦i

结构优化
648540858 2024-01-12 18:26:14 +08:00
parent 07401b8621
commit 1d28869fd3
7 changed files with 225 additions and 345 deletions

View File

@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sip.message.SIPRequest;
@ -643,19 +644,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
}, userSetting.getPlatformPlayTimeout());
boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
if (!start) {
try {
responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
streamProxyService.start(gbStream.getApp(), gbStream.getStream(), (code, msg, data) -> {
if (code != ErrorCode.SUCCESS.getCode()) {
try {
responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
}
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
dynamicTask.stop(callIdHeader.getCallId());
}
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
dynamicTask.stop(callIdHeader.getCallId());
}
});
} else if ("push".equals(gbStream.getStreamType())) {
if (!platform.isStartOfflinePush()) {
// 平台设置中关闭了拉起离线的推流则直接回复

View File

@ -374,10 +374,7 @@ public class ZLMHttpHookListener {
mediaServerService.removeCount(param.getMediaServerId());
}
// 设置拉流代理上线/离线
int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
if (updateStatusResult > 0) {
}
streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
if ("rtp".equals(param.getApp()) && !param.isRegist()) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
@ -546,14 +543,13 @@ public class ZLMHttpHookListener {
if (streamProxyItem.isEnableRemoveNoneReader()) {
// 无人观看自动移除
ret.put("close", true);
streamProxyService.del(param.getApp(), param.getStream());
String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getUrl();
logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", param.getApp(), param.getStream(), url);
streamProxyService.removeProxy(streamProxyItem.getId());
logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", param.getApp(), param.getStream(), streamProxyItem.getUrl());
} else if (streamProxyItem.isEnableDisableNoneReader()) {
// 无人观看停用
ret.put("close", true);
// 修改数据
streamProxyService.stop(param.getApp(), param.getStream());
streamProxyService.stop(param.getApp(), param.getStream(), null);
} else {
// 无人观看不做处理
ret.put("close", false);
@ -682,7 +678,7 @@ public class ZLMHttpHookListener {
// 拉流代理
StreamProxy streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
streamProxyService.start(param.getApp(), param.getStream());
streamProxyService.start(param.getApp(), param.getStream(), null);
}
DeferredResult<HookResult> result = new DeferredResult<>();
result.setResult(HookResult.SUCCESS());

View File

@ -19,19 +19,6 @@ public interface IStreamProxyService {
*/
void save(StreamProxy param, GeneralCallback<StreamInfo> callback);
/**
* zlm
* @param param
* @return
*/
JSONObject addStreamProxyToZlm(StreamProxy param);
/**
* zlm
* @param param
* @return
*/
JSONObject removeStreamProxyFromZlm(StreamProxy param);
/**
*
@ -41,12 +28,6 @@ public interface IStreamProxyService {
*/
PageInfo<StreamProxy> getAll(Integer page, Integer count);
/**
*
* @param app
* @param stream
*/
void del(String app, String stream);
/**
*
@ -54,7 +35,7 @@ public interface IStreamProxyService {
* @param stream
* @return
*/
boolean start(String app, String stream);
void start(String app, String stream, GeneralCallback<StreamInfo> callback);
/**
*
@ -62,7 +43,7 @@ public interface IStreamProxyService {
* @param app
* @param stream
*/
int updateStatus(boolean status, String app, String stream);
void updateStatus(boolean status, String app, String stream);
@ -72,7 +53,7 @@ public interface IStreamProxyService {
* @param stream
* @return
*/
boolean stop(String app, String stream);
void stop(String app, String stream, GeneralCallback<StreamInfo> callback);
/**
* ffmpeg.cmd
@ -101,13 +82,6 @@ public interface IStreamProxyService {
*/
void zlmServerOffline(String mediaServerId);
void clean();
/**
*
*/
boolean updateStreamProxy(StreamProxy streamProxyItem);
/**
*
* @return
@ -129,7 +103,7 @@ public interface IStreamProxyService {
*/
void add(StreamProxy param, GeneralCallback<StreamInfo> callback);
void delById(int gbId);
void removeProxy(int gbId);
/**
*

View File

@ -70,18 +70,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private StreamProxyMapper streamProxyMapper;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorage storager;
@Autowired
private UserSetting userSetting;
@ -183,9 +177,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
param.setEnable(false);
// 直接移除
if (param.isEnableRemoveNoneReader()) {
del(param.getApp(), param.getStream());
delProxyFromDb(param.getApp(), param.getStream());
}else {
updateStreamProxy(param);
updateProxyToDb(param);
}
}
});
@ -209,30 +203,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
logger.warn("[添加拉流代理] 未找到在线的ZLM...");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM");
}
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
if (ObjectUtils.isEmpty(param.getDstUrl())) {
logger.warn("[添加拉流代理] 未设置目标URL地址DstUrl");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址");
}
logger.info("[添加拉流代理] ffmpeg源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl());
if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) {
try {
URL url = new URL(param.getDstUrl());
String path = url.getPath();
if (path.indexOf("/", 1) < 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径");
}
String app = path.substring(1, path.indexOf("/", 2));
String stream = path.substring(path.indexOf("/", 2) + 1);
param.setApp(app);
param.setStream(stream);
} catch (MalformedURLException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败");
}
}
}else {
logger.info("[添加拉流代理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream());
}
proxyParamHandler(param);
param.setMediaServerId(mediaInfo.getId());
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
if (streamProxyInDb != null) {
@ -262,6 +233,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
@Transactional
public void edit(StreamProxy param, GeneralCallback<StreamInfo> callback) {
MediaServerItem mediaInfo;
StreamProxy streamProxyInDb = streamProxyMapper.selectOneById(param.getId());
@ -277,6 +249,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
logger.warn("[编辑拉流代理] 未找到在线的ZLM...");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM");
}
proxyParamHandler(param);
param.setMediaServerId(mediaInfo.getId());
// 国标编号发生变化,修改通用通道国标变化,通用通道应发送删除再发送添加命令通知上级
// 类型变化,启用->启用:需要重新拉起视频流, 启用->未启用: 停止旧的视频流, 未启用->启用:发起新的视频流
// 流地址发生变化。停止旧的,拉起新的
@ -293,116 +267,39 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
// 如果是开启代理这是开启代理结束后的回调
final GeneralCallback<StreamInfo> startProxyCallback = (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
param.setStatus(true);
} else {
param.setStatus(false);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError(msg);
}
updateProxyToDb(param);
callback.run(code, msg, null);
};
if (stopOldProxy) {
if(stopOldProxy) {
stopProxy(param, mediaInfo, (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
if (param.isEnable()) {
startProxy(param, mediaInfo, startProxyCallback);
}else {
callback.run(code, msg, null);
}
}else {
callback.run(code, "停止旧的代理失败: " + msg, null);
}
});
}else {
if (param.isEnable()) {
startProxy(param, mediaInfo, startProxyCallback);
}
}
if (ObjectUtils.isEmpty(streamProxyInDb.getGbId())) {
if (!ObjectUtils.isEmpty(param.getGbId())) {
// 之前是空的,现在添加了国标编号
}
}else {
if (ObjectUtils.isEmpty(param.getGbId())) {
// 移除了国标编号
}else {
if (!streamProxyInDb.getGbId().equals(param.getGbId())) {
// 修改了国标编号
}
param.setStatus(false);
updateProxyToDb(param);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
}
}
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
if (ObjectUtils.isEmpty(param.getDstUrl())) {
logger.warn("[添加拉流代理] 未设置目标URL地址DstUrl");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址");
}
logger.info("[添加拉流代理] ffmpeg源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl());
if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) {
try {
URL url = new URL(param.getDstUrl());
String path = url.getPath();
if (path.indexOf("/", 1) < 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径");
}
String app = path.substring(1, path.indexOf("/", 2));
String stream = path.substring(path.indexOf("/", 2) + 1);
param.setApp(app);
param.setStream(stream);
} catch (MalformedURLException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败");
}
}
}else {
logger.info("[添加拉流代理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream());
}
param.setMediaServerId(mediaInfo.getId());
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
if (streamProxyInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在");
}
if (!param.isEnable()) {
param.setStatus(false);
addProxyToDb(param);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
return;
}
String talkKey = UUID.randomUUID().toString();
String delayTalkKey = UUID.randomUUID().toString();
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
dynamicTask.stop(talkKey);
param.setStatus(true);
addProxyToDb(param);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
});
dynamicTask.startDelay(delayTalkKey, ()->{
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError("启用超时");
param.setStatus(false);
addProxyToDb(param);
}, 10000);
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError("启用失败: " + jsonObject.getString("msg"));
param.setStatus(false);
addProxyToDb(param);
}
}
public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
@ -433,6 +330,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
streamProxy.setStatus(true);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
logger.info("[拉流代理] 启用成功: {}/{}", streamProxy.getApp(), streamProxy.getStream());
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
});
@ -479,6 +377,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
if (ready) {
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
zlmresTfulUtils.delFFmpegSource(mediaInfo, streamProxy.getStreamKey());
}else {
zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey());
}
mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
}
// 检查redis内容是否正确
@ -492,6 +395,32 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
}
public void proxyParamHandler(StreamProxy param) {
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
if (ObjectUtils.isEmpty(param.getDstUrl())) {
logger.warn("[拉流代理参数处理] 未设置目标URL地址DstUrl");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址");
}
logger.info("[拉流代理参数处理] ffmpeg源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl());
if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) {
try {
URL url = new URL(param.getDstUrl());
String path = url.getPath();
if (path.indexOf("/", 1) < 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径");
}
String app = path.substring(1, path.indexOf("/", 2));
String stream = path.substring(path.indexOf("/", 2) + 1);
param.setApp(app);
param.setStream(stream);
} catch (MalformedURLException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败");
}
}
}else {
logger.info("[拉流代理参数处理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream());
}
}
private void addProxyToDb(StreamProxy param) {
// 未启用的数据可以直接保存了
@ -535,15 +464,19 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
param.setCommonGbChannelId(commonGbChannel.getCommonGbId());
}
}
if (!param.getGbId().equals(streamProxyInDb.getGbId())) {
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
commonGbChannel.setCommonGbId(streamProxyInDb.getCommonGbChannelId());
// 国标ID已经改变
commonGbChannelService.update(commonGbChannel);
}
param.setUpdateTime(DateUtil.getNow());
param.setStatus(streamProxyInDb.isStatus());
int addStreamProxyResult = streamProxyMapper.add(param);
int addStreamProxyResult = streamProxyMapper.update(param);
if (addStreamProxyResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加拉流代理失败");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存拉流代理失败");
}
}
private String getSchemaFromFFmpegCmd(String ffmpegCmd) {
ffmpegCmd = ffmpegCmd.replaceAll(" + ", " ");
String[] paramArray = ffmpegCmd.split(" ");
@ -563,73 +496,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return null;
}
/**
*
* @param streamProxyItem
* @return
*/
@Override
public boolean updateStreamProxy(StreamProxy streamProxyItem) {
streamProxyItem.setCreateTime(DateUtil.getNow());
return streamProxyMapper.update(streamProxyItem) > 0;
}
@Override
public JSONObject addStreamProxyToZlm(StreamProxy param) {
JSONObject result = null;
MediaServerItem mediaServerItem = null;
if (param.getMediaServerId() == null) {
logger.warn("添加代理时MediaServerId 为null");
return null;
}else {
mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
}
if (mediaServerItem == null) {
return null;
}
if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) {
zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
}
if ("ffmpeg".equalsIgnoreCase(param.getType())){
result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getUrl().trim(), param.getDstUrl(),
param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
param.getFfmpegCmdKey());
}else {
result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
}
if (result != null && result.getInteger("code") == 0) {
JSONObject data = result.getJSONObject("data");
if (data == null) {
logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
return result;
}
String key = data.getString("key");
if (key == null) {
logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result );
return result;
}
param.setStreamKey(key);
streamProxyMapper.update(param);
}
return result;
}
@Override
public JSONObject removeStreamProxyFromZlm(StreamProxy param) {
if (param ==null) {
return null;
}
MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
JSONObject result = null;
if ("ffmpeg".equalsIgnoreCase(param.getType())){
result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey());
}else {
result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey());
}
return result;
}
@Override
public PageInfo<StreamProxy> getAll(Integer page, Integer count) {
PageHelper.startPage(page, count);
@ -637,8 +503,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return new PageInfo<>(all);
}
@Override
public void del(String app, String stream) {
private void delProxyFromDb(String app, String stream) {
StreamProxy streamProxyItem = streamProxyMapper.selectOne(app, stream);
if (streamProxyItem == null) {
return;
@ -648,70 +513,80 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
streamProxyMapper.delById(streamProxyItem.getId());
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除成功", app, stream);
}else {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除失败", app, stream);
}
}
@Override
public void delById(int id) {
StreamProxy streamProxyItem = streamProxyMapper.selectOneById(id);
if (streamProxyItem == null) {
public void removeProxy(int id) {
StreamProxy streamProxy = streamProxyMapper.selectOneById(id);
if (streamProxy == null) {
return;
}
if (streamProxyItem.getCommonGbChannelId() > 0) {
commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId());
}
streamProxyMapper.delById(id);
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", streamProxyItem.getApp(), streamProxyItem.getStream());
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除成功", streamProxyItem.getApp(), streamProxyItem.getStream());
if (streamProxy.isEnable()) {
String mediaServerId = streamProxy.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
boolean ready = mediaService.isReady(mediaServerItem, streamProxy.getApp(), streamProxy.getStream());
if (ready) {
stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除成功", streamProxy.getApp(), streamProxy.getStream());
}else {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除失败", streamProxy.getApp(), streamProxy.getStream());
}
});
}
}
}else {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除失败", streamProxyItem.getApp(), streamProxyItem.getStream());
delProxyFromDb(streamProxy.getApp(), streamProxy.getStream());
}
}
@Override
public boolean start(String app, String stream) {
boolean result = false;
@Transactional
public void start(String app, String stream, GeneralCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy != null && !streamProxy.isEnable() ) {
JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
if (jsonObject == null) {
return false;
}
if (jsonObject.getInteger("code") == 0) {
result = true;
streamProxy.setEnable(true);
updateStreamProxy(streamProxy);
}else {
logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"),
streamProxy.getUrl() == null? streamProxy.getUrl():streamProxy.getUrl());
}
} else if (streamProxy != null && streamProxy.isEnable()) {
return true ;
if (streamProxy == null || !streamProxy.isEnable()){
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null);
return;
}
return result;
String mediaServerId = streamProxy.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null);
return;
}
startProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
streamProxy.setStatus(true);
}else {
streamProxy.setStatus(false);
}
streamProxy.setUpdateTime(DateUtil.getNow());
updateProxyToDb(streamProxy);
callback.run(code, msg, data);
});
}
@Override
public boolean stop(String app, String stream) {
boolean result = false;
StreamProxy streamProxyDto = streamProxyMapper.selectOne(app, stream);
if (streamProxyDto != null && streamProxyDto.isEnable()) {
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
streamProxyDto.setEnable(false);
result = updateStreamProxy(streamProxyDto);
}
@Transactional
public void stop(String app, String stream, GeneralCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy == null || !streamProxy.isEnable()){
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null);
return;
}
return result;
String mediaServerId = streamProxy.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null);
return;
}
stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
streamProxy.setStatus(false);
streamProxy.setUpdateTime(DateUtil.getNow());
updateProxyToDb(streamProxy);
callback.run(code, msg, data);
});
}
@Override
@ -719,7 +594,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
JSONObject result = new JSONObject();
JSONObject mediaServerConfigResuly = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
if (mediaServerConfigResuly != null && mediaServerConfigResuly.getInteger("code") == 0
&& mediaServerConfigResuly.getJSONArray("data").size() > 0){
&& !mediaServerConfigResuly.getJSONArray("data").isEmpty()){
JSONObject mediaServerConfig = mediaServerConfigResuly.getJSONArray("data").getJSONObject(0);
for (String key : mediaServerConfig.keySet()) {
@ -738,6 +613,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
@Transactional
public void zlmServerOnline(String mediaServerId) {
// 移除开启了无人观看自动移除的流
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
@ -762,14 +638,34 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
mediaServerId, true);
for (StreamProxy streamProxyDto : streamProxyListForEnable) {
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto);
if (jsonObject == null) {
// 设置为离线
logger.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
updateStatus(false, streamProxyDto.getApp(), streamProxyDto.getStream());
MediaServerItem mediaServerItem = mediaServerService.getOne(streamProxyDto.getMediaServerId());
startProxy(streamProxyDto, mediaServerItem, (code, msg, data) -> {
if (code == ErrorCode.ERROR100.getCode()) {
if (!streamProxyDto.isStatus()) {
updateStatusById(streamProxyDto, true);
}
} else {
if (streamProxyDto.isStatus()) {
updateStatusById(streamProxyDto, false);
}
}
});
}
}
@Transactional
public void updateStatusById(StreamProxy streamProxy, boolean status) {
streamProxyMapper.updateStatusById(streamProxy.getId(), status);
if (streamProxy.getCommonGbChannelId() > 0) {
List<Integer> ids = new ArrayList<>();
ids.add(streamProxy.getCommonGbChannelId());
if (status) {
commonGbChannelService.onlineForList(ids);
}else {
updateStatus(true, streamProxyDto.getApp(), streamProxyDto.getStream());
commonGbChannelService.offlineForList(ids);
}
}
}
@ -811,13 +707,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
public void clean() {
}
@Override
public int updateStatus(boolean status, String app, String stream) {
return streamProxyMapper.updateStatus(app, stream, status);
@Transactional
public void updateStatus(boolean status, String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy == null) {
return;
}
if (streamProxy.getCommonGbChannelId() > 0) {
updateStatusById(streamProxy, status);
}
}
private void syncPullStream(String mediaServerId){
@ -843,7 +741,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
}
}
if (stringStreamInfoMap.size() == 0) {
if (stringStreamInfoMap.isEmpty()) {
redisCatchStorage.removeStream(mediaServerId, "PULL");
}else {
for (String key : stringStreamInfoMap.keySet()) {
@ -875,40 +773,40 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
*
*/
@Scheduled(cron = "* 0/10 * * * ?")
@Transactional
public void asyncCheckStreamProxyStatus() {
List<MediaServerItem> all = mediaServerService.getAllOnline();
if (CollectionUtils.isEmpty(all)){
return;
}
Map<String, MediaServerItem> serverItemMap = all.stream().collect(Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1));
Map<String, MediaServerItem> serverItemMap = all.stream().collect(
Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1));
List<StreamProxy> list = getAllForEnable();
if (CollectionUtils.isEmpty(list)){
return;
}
for (StreamProxy streamProxyItem : list) {
MediaServerItem mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
// TODO 支持其他 schema
JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp");
JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(),
streamProxyItem.getStream(), "rtsp");
if (mediaInfo == null){
streamProxyItem.setStatus(false);
if (streamProxyItem.isStatus()) {
updateStatusById(streamProxyItem, false);
}
} else {
if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {
streamProxyItem.setStatus(true);
if (!streamProxyItem.isStatus()) {
updateStatusById(streamProxyItem, true);
}
} else {
streamProxyItem.setStatus(false);
if (streamProxyItem.isStatus()) {
updateStatusById(streamProxyItem, false);
}
}
}
updateStreamProxy(streamProxyItem);
}
}
@ -921,4 +819,5 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
public List<StreamProxy> getAllForEnable() {
return streamProxyMapper.selectForEnable(true);
}
}

View File

@ -75,6 +75,11 @@ public interface StreamProxyMapper {
"WHERE app=#{app} AND stream=#{stream}")
int updateStatus(@Param("app") String app, @Param("stream") String stream, @Param("status") boolean status);
@Update("UPDATE wvp_stream_proxy " +
"SET status=#{status} " +
"WHERE id=#{id}")
int updateStatusById(@Param("id") int id, @Param("status") boolean status);
@Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}")
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);

View File

@ -94,8 +94,8 @@ public class MediaController {
return new StreamContent(streamInfo);
}else {
//获取流失败,重启拉流后重试一次
streamProxyService.stop(app,stream);
boolean start = streamProxyService.start(app, stream);
streamProxyService.stop(app,stream, null);
streamProxyService.start(app, stream, null);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {

View File

@ -97,7 +97,7 @@ public class StreamProxyController {
}
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
streamProxyService.del(param.getApp(), param.getStream());
streamProxyService.removeProxy(streamProxyItem.getId());
}
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
@ -226,7 +226,11 @@ public class StreamProxyController {
if (app == null || stream == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), app == null ?"app不能为null":"stream不能为null");
}else {
streamProxyService.del(app, stream);
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app,stream);
if (streamProxyItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在");
}
streamProxyService.removeProxy(streamProxyItem.getId());
}
}
@ -240,7 +244,7 @@ public class StreamProxyController {
if (proxy.getId() <= 0) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "缺少ID");
}else {
streamProxyService.delById(proxy.getId());
streamProxyService.removeProxy(proxy.getId());
}
}
@ -249,12 +253,14 @@ public class StreamProxyController {
@Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "app", description = "应用名", required = true)
@Parameter(name = "stream", description = "流id", required = true)
public void start(String app, String stream){
public DeferredResult<WVPResult> start(String app, String stream){
logger.info("启用代理: " + app + "/" + stream);
boolean result = streamProxyService.start(app, stream);
if (!result) {
throw new ControllerException(ErrorCode.ERROR100);
}
DeferredResult<WVPResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
streamProxyService.start(app, stream, (code, msg, data) -> {
WVPResult<Object> wvpResult = new WVPResult<>(code, msg, null);
result.setResult(wvpResult);
});
return result;
}
@GetMapping(value = "/stop")
@ -262,12 +268,13 @@ public class StreamProxyController {
@Operation(summary = "停用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "app", description = "应用名", required = true)
@Parameter(name = "stream", description = "流id", required = true)
public void stop(String app, String stream){
public DeferredResult<WVPResult> stop(String app, String stream){
logger.info("停用代理: " + app + "/" + stream);
boolean result = streamProxyService.stop(app, stream);
if (!result) {
logger.info("停用代理失败: " + app + "/" + stream);
throw new ControllerException(ErrorCode.ERROR100);
}
DeferredResult<WVPResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
streamProxyService.stop(app, stream, (code, msg, data) -> {
WVPResult<Object> wvpResult = new WVPResult<>(code, msg, null);
result.setResult(wvpResult);
});
return result;
}
}