From d9d8aaca6e3311aea47eb7d8232e1252736ae12e Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 21 Jan 2024 15:19:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=89=E6=B5=81=E4=BB=A3=E7=90=86=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=9C=AA=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/GB28181ResourceServiceImpl.java | 6 +- .../vmp/service/IStreamProxyPlayService.java | 21 ++ .../iot/vmp/service/IStreamProxyService.java | 6 +- .../impl/StreamProxyPlayServiceImpl.java | 195 ++++++++++++++++++ .../impl/StreamProxyResourceServiceImpl.java | 57 ++++- .../service/impl/StreamProxyServiceImpl.java | 139 ++----------- .../iot/vmp/storager/IRedisCatchStorage.java | 2 + .../vmp/storager/dao/StreamProxyMapper.java | 2 + .../storager/impl/RedisCatchStorageImpl.java | 14 ++ 9 files changed, 310 insertions(+), 132 deletions(-) create mode 100755 src/main/java/com/genersoft/iot/vmp/service/IStreamProxyPlayService.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyPlayServiceImpl.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java index 5d2635e2..3f777acb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java @@ -76,10 +76,8 @@ public class GB28181ResourceServiceImpl implements IResourceService { MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); playService.play(mediaServerItem, channel.getDeviceId(), channel.getChannelId(), null, (code, msg, data) -> { if (code == InviteErrorCode.SUCCESS.getCode()) { - if (data != null) { - StreamInfo streamInfo = (StreamInfo)data; - callback.call(commonGbChannel, mediaServerItem, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - } + StreamInfo streamInfo = (StreamInfo)data; + callback.call(commonGbChannel, mediaServerItem, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }else { callback.call(commonGbChannel, null, code, msg, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyPlayService.java new file mode 100755 index 00000000..51f8d605 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyPlayService.java @@ -0,0 +1,21 @@ +package com.genersoft.iot.vmp.service; + +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.GeneralCallback; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; +import com.github.pagehelper.PageInfo; + +import java.util.List; + +public interface IStreamProxyPlayService { + + + void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback); + + + void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 899665da..b59d4465 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -18,7 +18,6 @@ public interface IStreamProxyService { */ void save(StreamProxy param, GeneralCallback callback); - /** * 分页查询 */ @@ -97,4 +96,9 @@ public interface IStreamProxyService { * 播放代理流 */ void play(Integer id, GeneralCallback callback); + + /** + * 根据通用通道查询拉流代理 + */ + StreamProxy getStreamProxyByCommonGbChannelId(int commonGbId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyPlayServiceImpl.java new file mode 100644 index 00000000..da842b30 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyPlayServiceImpl.java @@ -0,0 +1,195 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.GeneralCallback; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IMediaService; +import com.genersoft.iot.vmp.service.IStreamProxyPlayService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.stereotype.Service; +import org.springframework.transaction.TransactionDefinition; + +import java.util.UUID; + +@Service +public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { + + private final static Logger logger = LoggerFactory.getLogger(StreamProxyPlayServiceImpl.class); + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private IMediaService mediaService; + + @Autowired + private ZLMRESTfulUtils zlmresTfulUtils; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + DataSourceTransactionManager dataSourceTransactionManager; + + @Autowired + TransactionDefinition transactionDefinition; + + @Autowired + IRedisCatchStorage redisCatchStorage; + + @Autowired + IMediaServerService mediaServerService; + + @Autowired + StreamProxyMapper streamProxyMapper; + + @Override + public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { + logger.info("[开始拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream()); + + OnStreamChangedHookParam streamChangedHookParam = redisCatchStorage.getProxyStreamInfo(streamProxy.getApp(), streamProxy.getStream(), null); + if (streamChangedHookParam != null) { + MediaServerItem serverItemInCatch = mediaServerService.getOne(streamChangedHookParam.getMediaServerId()); + if (serverItemInCatch != null) { + // 检测是否在线 + boolean ready = mediaService.isReady(serverItemInCatch, streamProxy.getApp(), streamProxy.getStream()); + if (ready) { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); + logger.info("[开始拉流代理] 已拉起,直接返回 {}/{}", streamProxy.getApp(), streamProxy.getStream()); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } + return; + }else { + redisCatchStorage.removeStream(streamChangedHookParam.getMediaServerId(), "PULL", streamChangedHookParam.getApp(), + streamChangedHookParam.getStream()); + } + }else { + redisCatchStorage.removeStream(streamChangedHookParam.getMediaServerId(), "PULL", streamChangedHookParam.getApp(), + streamChangedHookParam.getStream()); + } + } + + if (streamProxy.getStreamKey() != null) { + zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey()); + } + + String delayTalkKey = UUID.randomUUID().toString(); + + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId()); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { + dynamicTask.stop(delayTalkKey); + streamProxy.setPulling(true); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); + logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream()); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } + streamProxyMapper.update(streamProxy); + }); + + dynamicTask.startDelay(delayTalkKey, ()->{ + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + dynamicTask.stop(delayTalkKey); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); + } + streamProxy.setProxyError("启用超时"); + streamProxyMapper.update(streamProxy); + }, 10000); + JSONObject result; + if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ + result = zlmresTfulUtils.addFFmpegSource(mediaInfo, streamProxy.getUrl().trim(), streamProxy.getDstUrl(), + streamProxy.getTimeoutMs() + "", streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), + streamProxy.getFfmpegCmdKey()); + }else { + result = zlmresTfulUtils.addStreamProxy(mediaInfo, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getUrl().trim(), + streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtpType()); + } + if (result == null) { + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null); + } + return; + } + if (result.getInteger("code") != 0) { + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + dynamicTask.stop(delayTalkKey); + if (callback != null) { + callback.run(result.getInteger("code"), result.getString("msg"), null); + } + }else { + JSONObject data = result.getJSONObject("data"); + if (data == null) { + logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result ); + if (callback != null) { + callback.run(result.getInteger("code"), result.getString("msg"), null); + } + return; + } + String key = data.getString("key"); + if (key == null) { + logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result ); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null); + } + return; + } + streamProxy.setStreamKey(key); + streamProxyMapper.update(streamProxy); + } + } + + @Override + public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { + logger.info("[停止拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream()); + boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); + if (ready) { + if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ + zlmresTfulUtils.delFFmpegSource(mediaInfo, streamProxy.getStreamKey()); + }else { + zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey()); + } + mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); + } + // 检查redis内容是否正确 + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + + OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_" + + mediaInfo.getId(); + + if (redisTemplate.opsForValue().get(key) == null) { + redisTemplate.delete(key); + } + logger.info("[停止拉流代理] 成功 {}/{}", streamProxy.getApp(), streamProxy.getStream()); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyResourceServiceImpl.java index a1a14d79..eb6ce815 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyResourceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyResourceServiceImpl.java @@ -1,13 +1,35 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.common.CommonGbChannel; -import com.genersoft.iot.vmp.service.IResourcePlayCallback; -import com.genersoft.iot.vmp.service.IResourceService; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.GB28181ResourceServiceImpl; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; @Service(CommonGbChannelType.PROXY) public class StreamProxyResourceServiceImpl implements IResourceService { + + private final Logger logger = LoggerFactory.getLogger(StreamProxyResourceServiceImpl.class); + + @Autowired + private StreamProxyMapper streamProxyMapper; + + @Autowired + private IStreamProxyPlayService streamProxyPlayService; + + @Autowired + private IMediaServerService mediaServerService; + @Override public boolean deleteChannel(CommonGbChannel commonGbChannel) { return false; @@ -15,7 +37,36 @@ public class StreamProxyResourceServiceImpl implements IResourceService { @Override public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { - + assert callback != null; + if (!CommonGbChannelType.PROXY.equals(commonGbChannel.getType())) { + logger.warn("[资源类-拉流代理] 收到播放通道: {} 时发现类型不为proxy", commonGbChannel.getCommonGbId()); + callback.call(commonGbChannel, null, ErrorCode.ERROR100.getCode(), "数据关系错误", null); + return; + } + StreamProxy streamProxy = streamProxyMapper.selectOneByByCommonGbChannelId(commonGbChannel.getCommonGbId()); + if (streamProxy == null) { + logger.warn("[资源类-拉流代理] 收到播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId()); + callback.call(commonGbChannel, null, ErrorCode.ERROR100.getCode(), "未找到通道", null); + return; + } + String mediaServerId = streamProxy.getMediaServerId(); + MediaServerItem mediaServerItem; + if (ObjectUtils.isEmpty(mediaServerId) || mediaServerId.equals("auto")) { + mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); + }else { + mediaServerItem = mediaServerService.getOne(mediaServerId); + } + if (mediaServerItem == null) { + callback.call(commonGbChannel, null, ErrorCode.ERROR100.getCode(), "未找到可用的节点", null); + return; + } + streamProxyPlayService.startProxy(streamProxy, mediaServerItem, (code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + callback.call(commonGbChannel, mediaServerItem, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); + }else { + callback.call(commonGbChannel, null, code, msg, null); + } + }); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 3ea5b174..8317f8b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -17,10 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; -import com.genersoft.iot.vmp.service.ICommonGbChannelService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IMediaService; -import com.genersoft.iot.vmp.service.IStreamProxyService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; @@ -85,7 +82,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { private ZlmHttpHookSubscribe hookSubscribe; @Autowired - private DynamicTask dynamicTask; + private IStreamProxyPlayService streamProxyPlayService; @Autowired DataSourceTransactionManager dataSourceTransactionManager; @@ -169,7 +166,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } }); - startProxy(param, mediaInfo, (code, msg, data) -> { + streamProxyPlayService.startProxy(param, mediaInfo, (code, msg, data) -> { if (code != ErrorCode.SUCCESS.getCode()) { if (callback != null) { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); @@ -257,127 +254,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService { updateProxyToDb(param); }; if(stopOldProxy) { - stopProxy(param, mediaInfo, (code, msg, data) -> { + streamProxyPlayService.stopProxy(param, mediaInfo, (code, msg, data) -> { if (param.isPulling()) { - startProxy(param, mediaInfo, startProxyCallback); + streamProxyPlayService.startProxy(param, mediaInfo, callback); } }); } }); } - public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { - logger.info("[开始拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream()); - // 检测是否在线 - boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); - if (ready) { - // 检查redis内容是否正确 - String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" - + OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_" - + mediaInfo.getId(); - - if (redisTemplate.opsForValue().get(key) == null) { - logger.info("[拉起代理] 发现redis的流信息不存在,但是流存在。关闭流"); - mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); - }else { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); - logger.info("[开始拉流代理] 已拉起,直接返回 {}/{}", streamProxy.getApp(), streamProxy.getStream()); - if (callback != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - } - return; - } - } - String delayTalkKey = UUID.randomUUID().toString(); - - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId()); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { - dynamicTask.stop(delayTalkKey); - streamProxy.setPulling(true); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); - logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream()); - if (callback != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - } - }); - - dynamicTask.startDelay(delayTalkKey, ()->{ - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - dynamicTask.stop(delayTalkKey); - if (callback != null) { - callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); - } - streamProxy.setProxyError("启用超时"); - }, 10000); - JSONObject result; - if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ - result = zlmresTfulUtils.addFFmpegSource(mediaInfo, streamProxy.getUrl().trim(), streamProxy.getDstUrl(), - streamProxy.getTimeoutMs() + "", streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), - streamProxy.getFfmpegCmdKey()); - }else { - result = zlmresTfulUtils.addStreamProxy(mediaInfo, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getUrl().trim(), - streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtpType()); - } - if (result == null) { - if (callback != null) { - callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null); - } - return; - } - if (result.getInteger("code") != 0) { - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - dynamicTask.stop(delayTalkKey); - if (callback != null) { - callback.run(result.getInteger("code"), result.getString("msg"), null); - } - }else { - JSONObject data = result.getJSONObject("data"); - if (data == null) { - logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result ); - if (callback != null) { - callback.run(result.getInteger("code"), result.getString("msg"), null); - } - return; - } - String key = data.getString("key"); - if (key == null) { - logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result ); - if (callback != null) { - callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null); - } - return; - } - streamProxy.setStreamKey(key); - } - } - - public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { - logger.info("[停止拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream()); - boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); - if (ready) { - if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ - zlmresTfulUtils.delFFmpegSource(mediaInfo, streamProxy.getStreamKey()); - }else { - zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey()); - } - mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); - } - // 检查redis内容是否正确 - String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" - + OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_" - + mediaInfo.getId(); - - if (redisTemplate.opsForValue().get(key) == null) { - redisTemplate.delete(key); - } - logger.info("[停止拉流代理] 成功 {}/{}", streamProxy.getApp(), streamProxy.getStream()); - if (callback != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); - } - } - public void proxyParamHandler(StreamProxy param) { if ("ffmpeg".equalsIgnoreCase(param.getType())) { if (ObjectUtils.isEmpty(param.getDstUrl())) { @@ -515,7 +400,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (mediaServerItem != null) { boolean ready = mediaService.isReady(mediaServerItem, streamProxy.getApp(), streamProxy.getStream()); if (ready) { - stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> { + streamProxyPlayService.stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> { if (code == ErrorCode.SUCCESS.getCode()) { logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", streamProxy.getApp(), streamProxy.getStream()); }else { @@ -546,7 +431,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } return; } - startProxy(streamProxy, mediaServerItem, (code, msg, data) -> { + streamProxyPlayService.startProxy(streamProxy, mediaServerItem, (code, msg, data) -> { if (code == ErrorCode.SUCCESS.getCode()) { streamProxy.setPulling(true); }else { @@ -578,7 +463,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } return; } - stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> { + streamProxyPlayService.stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> { streamProxy.setPulling(false); streamProxy.setUpdateTime(DateUtil.getNow()); updateProxyToDb(streamProxy); @@ -818,7 +703,13 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (mediaServerItem == null && callback != null) { callback.run(ErrorCode.ERROR100.getCode(), "未找到可用的节点", null); + return; } - startProxy(streamProxy, mediaServerItem, callback); + streamProxyPlayService.startProxy(streamProxy, mediaServerItem, callback); + } + + @Override + public StreamProxy getStreamProxyByCommonGbChannelId(int commonGbId) { + return streamProxyMapper.selectOneByByCommonGbChannelId(commonGbId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index b663c5c6..9b0f93b2 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -134,6 +134,8 @@ public interface IRedisCatchStorage { OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId); + OnStreamChangedHookParam getProxyStreamInfo(String app, String streamId, String mediaServerId); + void addCpuInfo(double cpuInfo); void addMemInfo(double memInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index cb3e7565..561977da 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -104,4 +104,6 @@ public interface StreamProxyMapper { @Delete("delete from wvp_stream_proxy WHERE id=#{id}") void delById(int id); + @Select("SELECT * from wvp_stream_proxy WHERE common_gb_channel_id=#{commonGbId}") + StreamProxy selectOneByByCommonGbChannelId(@Param("commonGbId") int commonGbId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index a8146430..aceeb88e 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -477,6 +477,20 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return result; } + @Override + public OnStreamChangedHookParam getProxyStreamInfo(String app, String streamId, String mediaServerId) { + String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_PULL_" + app + "_" + streamId + "_" + mediaServerId; + + OnStreamChangedHookParam result = null; + List keys = RedisUtil.scan(redisTemplate, scanKey); + if (keys.size() > 0) { + String key = (String) keys.get(0); + result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class); + } + + return result; + } + @Override public void addCpuInfo(double cpuInfo) { String key = VideoManagerConstants.SYSTEM_INFO_CPU_PREFIX + userSetting.getServerId();