临时提交

结构优化
648540858 2024-01-12 14:52:57 +08:00
parent d287ff7763
commit 9c114823af
5 changed files with 41 additions and 73 deletions

View File

@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
@ -548,7 +547,7 @@ public class ZLMHttpHookListener {
// 无人观看自动移除 // 无人观看自动移除
ret.put("close", true); ret.put("close", true);
streamProxyService.del(param.getApp(), param.getStream()); streamProxyService.del(param.getApp(), param.getStream());
String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getSrcUrl(); String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getUrl();
logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", param.getApp(), param.getStream(), url); logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", param.getApp(), param.getStream(), url);
} else if (streamProxyItem.isEnableDisableNoneReader()) { } else if (streamProxyItem.isEnableDisableNoneReader()) {
// 无人观看停用 // 无人观看停用

View File

@ -19,8 +19,6 @@ public class StreamProxy {
private String mediaServerId; private String mediaServerId;
@Schema(description = "拉流地址") @Schema(description = "拉流地址")
private String url; private String url;
@Schema(description = "拉流地址")
private String srcUrl;
@Schema(description = "目标地址") @Schema(description = "目标地址")
private String dstUrl; private String dstUrl;
@Schema(description = "超时时间") @Schema(description = "超时时间")
@ -116,14 +114,6 @@ public class StreamProxy {
this.url = url; this.url = url;
} }
public String getSrcUrl() {
return srcUrl;
}
public void setSrcUrl(String srcUrl) {
this.srcUrl = srcUrl;
}
public String getDstUrl() { public String getDstUrl() {
return dstUrl; return dstUrl;
} }

View File

@ -250,55 +250,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
callback.run(code, msg, data); callback.run(code, msg, data);
if (code == ErrorCode.SUCCESS.getCode()) { if (code == ErrorCode.SUCCESS.getCode()) {
param.setStatus(true); param.setStatus(true);
addProxyToDb(param);
} else { } else {
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError(msg);
param.setStatus(false);
} }
addProxyToDb(param);
}); });
String talkKey = UUID.randomUUID().toString();
String delayTalkKey = UUID.randomUUID().toString();
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
dynamicTask.stop(talkKey);
param.setStatus(true);
addProxyToDb(param);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
});
dynamicTask.startDelay(delayTalkKey, ()->{
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError("启用超时");
param.setStatus(false);
addProxyToDb(param);
}, 10000);
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError("启用失败: " + jsonObject.getString("msg"));
param.setStatus(false);
addProxyToDb(param);
}
} }
@Override @Override
public void edit(StreamProxy param, GeneralCallback<StreamInfo> callback) { public void edit(StreamProxy param, GeneralCallback<StreamInfo> callback) {
MediaServerItem mediaInfo; MediaServerItem mediaInfo;
StreamProxy streamProxyInDb = streamProxyMapper.selectOneById(param.getId()); StreamProxy streamProxyInDb = streamProxyMapper.selectOneById(param.getId());
if (streamProxyInDb != null) { if (streamProxyInDb == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在");
} }
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
@ -316,6 +283,36 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
// ffmpeg类型下目标流地址变化停止旧的拉起新的 // ffmpeg类型下目标流地址变化停止旧的拉起新的
// 节点变化: 停止旧的,拉起新的 // 节点变化: 停止旧的,拉起新的
// ffmpeg命令模板变化 停止旧的,拉起新的 // ffmpeg命令模板变化 停止旧的,拉起新的
boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType())
|| !streamProxyInDb.getUrl().equals(param.getUrl())
|| !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId())
|| (streamProxyInDb.getType().equals("ffmpeg") && (
streamProxyInDb.getDstUrl().equals(param.getDstUrl())
|| streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey())
));
// 如果是开启代理这是开启代理结束后的回调
final GeneralCallback<StreamInfo> startProxyCallback = (code, msg, data) -> {
};
if (stopOldProxy) {
stopProxy(param, mediaInfo, (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
if (param.isEnable()) {
startProxy(param, mediaInfo, startProxyCallback);
}
}
});
}else {
if (param.isEnable()) {
startProxy(param, mediaInfo, startProxyCallback);
}
}
if (ObjectUtils.isEmpty(streamProxyInDb.getGbId())) { if (ObjectUtils.isEmpty(streamProxyInDb.getGbId())) {
if (!ObjectUtils.isEmpty(param.getGbId())) { if (!ObjectUtils.isEmpty(param.getGbId())) {
// 之前是空的,现在添加了国标编号 // 之前是空的,现在添加了国标编号
@ -447,7 +444,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}, 10000); }, 10000);
JSONObject result; JSONObject result;
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
result = zlmresTfulUtils.addFFmpegSource(mediaInfo, streamProxy.getSrcUrl().trim(), streamProxy.getDstUrl(), result = zlmresTfulUtils.addFFmpegSource(mediaInfo, streamProxy.getUrl().trim(), streamProxy.getDstUrl(),
streamProxy.getTimeoutMs() + "", streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getTimeoutMs() + "", streamProxy.isEnableAudio(), streamProxy.isEnableMp4(),
streamProxy.getFfmpegCmdKey()); streamProxy.getFfmpegCmdKey());
}else { }else {
@ -594,7 +591,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream()); zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
} }
if ("ffmpeg".equalsIgnoreCase(param.getType())){ if ("ffmpeg".equalsIgnoreCase(param.getType())){
result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(), result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getUrl().trim(), param.getDstUrl(),
param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(), param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
param.getFfmpegCmdKey()); param.getFfmpegCmdKey());
}else { }else {
@ -695,7 +692,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
updateStreamProxy(streamProxy); updateStreamProxy(streamProxy);
}else { }else {
logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"), logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"),
streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl()); streamProxy.getUrl() == null? streamProxy.getUrl():streamProxy.getUrl());
} }
} else if (streamProxy != null && streamProxy.isEnable()) { } else if (streamProxy != null && streamProxy.isEnable()) {
return true ; return true ;

View File

@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
@ -27,8 +26,6 @@ import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import java.util.UUID;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
/** /**
* *
@ -142,11 +139,6 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getGbId())) { if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null); param.setGbId(null);
} }
if (ObjectUtils.isEmpty(param.getSrcUrl())) {
param.setSrcUrl(param.getUrl());
}else {
param.setUrl(param.getSrcUrl());
}
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
// 录像查询以channelId作为deviceId查询 // 录像查询以channelId作为deviceId查询
@ -189,11 +181,6 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getGbId())) { if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null); param.setGbId(null);
} }
if (ObjectUtils.isEmpty(param.getSrcUrl())) {
param.setSrcUrl(param.getUrl());
}else {
param.setUrl(param.getSrcUrl());
}
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
// 录像查询以channelId作为deviceId查询 // 录像查询以channelId作为deviceId查询

View File

@ -33,9 +33,6 @@
<el-form-item label="拉流地址" prop="url" v-if="proxyParam.type=='default'"> <el-form-item label="拉流地址" prop="url" v-if="proxyParam.type=='default'">
<el-input v-model="proxyParam.url" clearable></el-input> <el-input v-model="proxyParam.url" clearable></el-input>
</el-form-item> </el-form-item>
<el-form-item label="拉流地址" prop="srcUrl" v-if="proxyParam.type=='ffmpeg'">
<el-input v-model="proxyParam.srcUrl" clearable></el-input>
</el-form-item>
<el-form-item label="超时时间:毫秒" prop="timeoutMs" v-if="proxyParam.type=='ffmpeg'"> <el-form-item label="超时时间:毫秒" prop="timeoutMs" v-if="proxyParam.type=='ffmpeg'">
<el-input v-model="proxyParam.timeoutMs" clearable></el-input> <el-input v-model="proxyParam.timeoutMs" clearable></el-input>
</el-form-item> </el-form-item>
@ -158,7 +155,6 @@ export default {
app: null, app: null,
stream: null, stream: null,
url: "", url: "",
srcUrl: null,
timeoutMs: null, timeoutMs: null,
ffmpegCmdKey: null, ffmpegCmdKey: null,
gbId: null, gbId: null,
@ -180,7 +176,6 @@ export default {
app: [{ required: true, message: "请输入应用名", trigger: "blur" }], app: [{ required: true, message: "请输入应用名", trigger: "blur" }],
stream: [{ required: true, message: "请输入流ID", trigger: "blur" }], stream: [{ required: true, message: "请输入流ID", trigger: "blur" }],
url: [{ required: true, message: "请输入要代理的流", trigger: "blur" }], url: [{ required: true, message: "请输入要代理的流", trigger: "blur" }],
srcUrl: [{ required: true, message: "请输入要代理的流", trigger: "blur" }],
timeoutMs: [{ required: true, message: "请输入FFmpeg推流成功超时时间", trigger: "blur" }], timeoutMs: [{ required: true, message: "请输入FFmpeg推流成功超时时间", trigger: "blur" }],
ffmpegCmdKey: [{ required: false, message: "请输入FFmpeg命令参数模板可选", trigger: "blur" }], ffmpegCmdKey: [{ required: false, message: "请输入FFmpeg命令参数模板可选", trigger: "blur" }],
}, },