From e8411bc797e85ce9147b2ebe4fcc0008ee73ded0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 25 Sep 2023 16:22:36 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=88=AA=E5=9B=BE?= =?UTF-8?q?=E6=88=AA=E5=9B=BE=E6=9C=AA=E5=85=B3=E9=97=AD=E6=B5=81=20#1087?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/vmanager/gb28181/device/DeviceQuery.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index b182b265..1986b653 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -36,6 +36,7 @@ import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -472,7 +473,10 @@ public class DeviceQuery { try { final InputStream in = Files.newInputStream(new File("snap" + File.separator + deviceId + "_" + channelId + (mark == null? ".jpg": ("_" + mark + ".jpg"))).toPath()); resp.setContentType(MediaType.IMAGE_PNG_VALUE); + ServletOutputStream outputStream = resp.getOutputStream(); IOUtils.copy(in, resp.getOutputStream()); + in.close(); + outputStream.close(); } catch (IOException e) { resp.setStatus(HttpServletResponse.SC_NOT_FOUND); } From 483d5e04a71b6632419699df1077ac5a5457ad38 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 25 Sep 2023 18:51:02 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8B=89=E6=B5=81?= =?UTF-8?q?=E4=BB=A3=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../subscribe/catalog/CatalogEventLister.java | 2 +- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 21 ++++++++++++---- .../service/impl/StreamProxyServiceImpl.java | 24 ++++++++++++------- .../streamProxy/StreamProxyController.java | 3 +++ 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 89ecb186..122bc545 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -93,7 +93,7 @@ public class CatalogEventLister implements ApplicationListener { } if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ for (GbStream gbStream : event.getGbStreams()) { - if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) { + if (gbStream != null && gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) { continue; } DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index e58f2aeb..0403eecf 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -32,13 +32,20 @@ public class ZLMRESTfulUtils { } private OkHttpClient getClient(){ + return getClient(null); + } + + private OkHttpClient getClient(Integer readTimeOut){ if (client == null) { + if (readTimeOut == null) { + readTimeOut = 10; + } OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); //todo 暂时写死超时时间 均为5s // 设置连接超时时间 - httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS); + httpClientBuilder.connectTimeout(8,TimeUnit.SECONDS); // 设置读取超时时间 - httpClientBuilder.readTimeout(10,TimeUnit.SECONDS); + httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS); // 设置连接池 httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES)); if (logger.isDebugEnabled()) { @@ -55,9 +62,13 @@ public class ZLMRESTfulUtils { } - public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map param, RequestCallback callback) { - OkHttpClient client = getClient(); + return sendPost(mediaServerItem, api, param, callback, null); + } + + + public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map param, RequestCallback callback, Integer readTimeOut) { + OkHttpClient client = getClient(readTimeOut); if (mediaServerItem == null) { return null; @@ -310,7 +321,7 @@ public class ZLMRESTfulUtils { param.put("enable_mp4", enable_mp4?1:0); param.put("enable_audio", enable_audio?1:0); param.put("rtp_type", rtp_type); - return sendPost(mediaServerItem, "addStreamProxy",param, null); + return sendPost(mediaServerItem, "addStreamProxy",param, null, 20); } public JSONObject closeStreams(MediaServerItem mediaServerItem, String app, String stream) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index a4e00ad4..f36cff90 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -11,6 +11,8 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -160,15 +162,20 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); return; } - + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }); if (param.isEnable()) { String talkKey = UUID.randomUUID().toString(); - dynamicTask.startCron(talkKey, ()->{ - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); - if (streamInfo != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - } - }, 1000); +// dynamicTask.startCron(talkKey, ()->{ +// StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); +// if (streamInfo != null) { +// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); +// } +// }, 3000); String delayTalkKey = UUID.randomUUID().toString(); dynamicTask.startDelay(delayTalkKey, ()->{ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); @@ -178,9 +185,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { dynamicTask.stop(talkKey); callback.run(ErrorCode.ERROR100.getCode(), "超时", null); } - }, 5000); + }, 7000); JSONObject jsonObject = addStreamProxyToZlm(param); if (jsonObject != null && jsonObject.getInteger("code") == 0) { + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); dynamicTask.stop(talkKey); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index 0689f423..fe6df729 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -80,6 +80,9 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getType())) { param.setType("default"); } + if (ObjectUtils.isEmpty(param.getRtpType())) { + param.setRtpType("1"); + } if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbId(null); } From 7620527340fbd0a7525919f9a8b1893722bb4e6c Mon Sep 17 00:00:00 2001 From: chenzhangyue Date: Thu, 28 Sep 2023 16:00:05 +0800 Subject: [PATCH 3/4] fix hasSubChannel --- .../genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 1bdae76b..1ff0d29f 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -167,8 +167,8 @@ public interface DeviceChannelMapper { " AND (dc.channel_id LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%')) " + " AND dc.status=true " + " AND dc.status=false " + - " AND dc.sub_count > 0 " + - " AND dc.sub_count = 0 " + + " AND dc.sub_count > 0 " + + " AND dc.sub_count = 0 " + " AND dc.id not in (select device_channel_id from wvp_platform_gb_channel where platform_id=#{platformId} ) " + " AND pgc.platform_id = #{platformId} and pgc.catalog_id=#{catalogId} " + " ORDER BY dc.device_id, dc.channel_id ASC" + From 130dc5d82da0e89241eefd4ea91ba7d861de866d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sat, 30 Sep 2023 06:09:16 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8B=89=E6=B5=81?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?ffmpeg=E6=8B=89=E6=B5=81=E4=BB=A3=E7=90=86=E9=89=B4=E6=9D=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/2.6.9更新.sql | 3 + sql/初始化.sql | 1 + .../subscribe/catalog/CatalogEventLister.java | 5 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 7 ++ .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 6 ++ .../vmp/media/zlm/dto/StreamProxyItem.java | 9 +++ .../service/impl/StreamProxyServiceImpl.java | 65 ++++++++++++------- .../vmp/storager/dao/StreamProxyMapper.java | 7 +- .../streamProxy/StreamProxyController.java | 14 ++++ 9 files changed, 91 insertions(+), 26 deletions(-) diff --git a/sql/2.6.9更新.sql b/sql/2.6.9更新.sql index 514b39e4..769004d2 100644 --- a/sql/2.6.9更新.sql +++ b/sql/2.6.9更新.sql @@ -3,3 +3,6 @@ alter table wvp_device_channel alter table wvp_platform add auto_push_channel bool default false + +alter table wvp_stream_proxy + add stream_key varying(255) diff --git a/sql/初始化.sql b/sql/初始化.sql index 64c404f5..b95dd874 100644 --- a/sql/初始化.sql +++ b/sql/初始化.sql @@ -244,6 +244,7 @@ create table wvp_stream_proxy ( create_time character varying(50), name character varying(255), update_time character varying(50), + stream_key character varying(255), enable_disable_none_reader bool default false, constraint uk_stream_proxy_app_stream unique (app, stream) ); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 122bc545..0448cd20 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -93,7 +93,10 @@ public class CatalogEventLister implements ApplicationListener { } if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ for (GbStream gbStream : event.getGbStreams()) { - if (gbStream != null && gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) { + if (gbStream != null + && gbStream.getStreamType() != null + && gbStream.getStreamType().equals("push") + && !userSetting.isUsePushingAsStatus()) { continue; } DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 217d00a1..3c76883a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -199,6 +199,13 @@ public class ZLMHttpHookListener { } // 推流鉴权的处理 if (!"rtp".equals(param.getApp())) { + StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + if (stream != null) { + HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); + result.setEnable_audio(stream.isEnableAudio()); + result.setEnable_mp4(stream.isEnableMp4()); + return result; + } if (userSetting.getPushAuthority()) { // 推流鉴权 if (param.getParams() == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 0403eecf..84e9e7e6 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -272,6 +272,12 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "delFFmpegSource",param, null); } + public JSONObject delStreamProxy(MediaServerItem mediaServerItem, String key){ + Map param = new HashMap<>(); + param.put("key", key); + return sendPost(mediaServerItem, "delStreamProxy",param, null); + } + public JSONObject getMediaServerConfig(MediaServerItem mediaServerItem){ return sendPost(mediaServerItem, "getServerConfig",null, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java index dd517e3d..0486d00d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java @@ -41,6 +41,9 @@ public class StreamProxyItem extends GbStream { @Schema(description = "是否 无人观看时自动停用") private boolean enableDisableNoneReader; + @Schema(description = "拉流代理时zlm返回的key,用于停止拉流代理") + private String streamKey; + public String getType() { return type; } @@ -167,5 +170,11 @@ public class StreamProxyItem extends GbStream { this.enableAudio = enable_audio; } + public String getStreamKey() { + return streamKey; + } + public void setStreamKey(String streamKey) { + this.streamKey = streamKey; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index f36cff90..7fbe7691 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; @@ -61,6 +62,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private ZLMRESTfulUtils zlmresTfulUtils; + @Autowired + private ZLMServerFactory zlmServerFactory; + @Autowired private StreamProxyMapper streamProxyMapper; @@ -145,7 +149,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(), param.getStream()); }else { - dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), + dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtspPort(), param.getApp(), param.getStream()); } param.setDstUrl(dstUrl); @@ -170,12 +174,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }); if (param.isEnable()) { String talkKey = UUID.randomUUID().toString(); -// dynamicTask.startCron(talkKey, ()->{ -// StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); -// if (streamInfo != null) { -// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); -// } -// }, 3000); String delayTalkKey = UUID.randomUUID().toString(); dynamicTask.startDelay(delayTalkKey, ()->{ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); @@ -318,13 +316,32 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (mediaServerItem == null) { return null; } - if ("default".equals(param.getType())){ - result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(), - param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); - }else if ("ffmpeg".equals(param.getType())) { + if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) { + zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream()); + } + if ("ffmpeg".equalsIgnoreCase(param.getType())){ result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(), param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(), param.getFfmpegCmdKey()); + }else { + result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(), + param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); + } + System.out.println("addStreamProxyToZlm===="); + System.out.println(result); + if (result != null && result.getInteger("code") == 0) { + JSONObject data = result.getJSONObject("data"); + if (data == null) { + logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result ); + return result; + } + String key = data.getString("key"); + if (key == null) { + logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result ); + return result; + } + param.setStreamKey(key); + streamProxyMapper.update(param); } return result; } @@ -335,7 +352,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return null; } MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); - JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream()); + JSONObject result = null; + if ("ffmpeg".equalsIgnoreCase(param.getType())){ + result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey()); + }else { + result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey()); + } return result; } @@ -350,19 +372,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (streamProxyItem != null) { gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL); - JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - // 如果关联了国标那么移除关联 - int i = platformGbStreamMapper.delByAppAndStream(app, stream); - gbStreamMapper.del(app, stream); - System.out.println(); - // TODO 如果关联的推流, 那么状态设置为离线 - } + // 如果关联了国标那么移除关联 + platformGbStreamMapper.delByAppAndStream(app, stream); + gbStreamMapper.del(app, stream); videoManagerStorager.deleteStreamProxy(app, stream); redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream); + JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); + if (jsonObject != null && jsonObject.getInteger("code") == 0) { + logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream); + }else { + logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", app, stream); + } } - - } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 34a0673e..6ad36cef 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -12,9 +12,9 @@ import java.util.List; public interface StreamProxyMapper { @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " + - "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" + + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" + "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " + - "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, " + + "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " + "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )") int add(StreamProxyItem streamProxyDto); @@ -33,6 +33,7 @@ public interface StreamProxyMapper { "enable_audio=#{enableAudio}, " + "enable=#{enable}, " + "status=#{status}, " + + "stream_key=#{streamKey}, " + "enable_remove_none_reader=#{enableRemoveNoneReader}, " + "enable_disable_none_reader=#{enableDisableNoneReader}, " + "enable_mp4=#{enableMp4} " + @@ -45,7 +46,7 @@ public interface StreamProxyMapper { @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc") List selectAll(); - @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc") + @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc") List selectForEnable(boolean enable); @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc") diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index fe6df729..e28ca11a 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -67,6 +67,16 @@ public class StreamProxyController { return streamProxyService.getAll(page, count); } + @Operation(summary = "查询流代理") + @Parameter(name = "app", description = "应用名") + @Parameter(name = "stream", description = "流Id") + @GetMapping(value = "/one") + @ResponseBody + public StreamProxyItem one(String app, String stream){ + + return streamProxyService.getStreamProxyByAppAndStream(app, stream); + } + @Operation(summary = "保存代理", parameters = { @Parameter(name = "param", description = "代理参数", required = true), }) @@ -86,6 +96,10 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbId(null); } + StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + if (streamProxyItem != null) { + streamProxyService.del(param.getApp(), param.getStream()); + } RequestMessage requestMessage = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();