From a9f88be8c592664332de2587f9304e371ab34192 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 14 Jul 2023 19:46:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8B=89=E6=B5=81=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E7=9A=84=E6=B7=BB=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/StreamProxyServiceImpl.java | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 71711389..a1c5b1c4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -4,14 +4,13 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -41,6 +40,7 @@ import org.springframework.util.ObjectUtils; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; /** * 视频代理业务 @@ -92,6 +92,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private ZlmHttpHookSubscribe hookSubscribe; + @Autowired + private DynamicTask dynamicTask; + @Autowired DataSourceTransactionManager dataSourceTransactionManager; @@ -158,17 +161,28 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return; } - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }); + String talkKey = UUID.randomUUID().toString(); + dynamicTask.startCron(talkKey, ()->{ + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); + if (streamInfo != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } + }, 1000); + String delayTalkKey = UUID.randomUUID().toString(); + dynamicTask.startDelay(delayTalkKey, ()->{ + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); + if (streamInfo != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }else { + dynamicTask.stop(talkKey); + callback.run(ErrorCode.ERROR100.getCode(), "超时", null); + } + }, 5000); if (param.isEnable()) { JSONObject jsonObject = addStreamProxyToZlm(param); if (jsonObject != null && jsonObject.getInteger("code") == 0) { - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + dynamicTask.stop(talkKey); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); @@ -295,10 +309,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return null; } if ("default".equals(param.getType())){ - result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(), + result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(), param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); }else if ("ffmpeg".equals(param.getType())) { - result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl(), param.getDstUrl(), + result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(), param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(), param.getFfmpegCmdKey()); }