From 1d28869fd3daa1e4082eccc7703662c8283dda49 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 12 Jan 2024 18:26:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8B=89=E6=B5=81=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E6=8E=A5=E5=8F=A3=E5=93=A6i?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/InviteRequestProcessor.java | 23 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 14 +- .../iot/vmp/service/IStreamProxyService.java | 34 +- .../service/impl/StreamProxyServiceImpl.java | 455 +++++++----------- .../vmp/storager/dao/StreamProxyMapper.java | 5 + .../gb28181/media/MediaController.java | 4 +- .../streamProxy/StreamProxyController.java | 35 +- 7 files changed, 225 insertions(+), 345 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 28f8838f..69062647 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.message.SIPRequest; @@ -643,19 +644,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream()); zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); - if (!start) { - try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); + streamProxyService.start(gbStream.getApp(), gbStream.getStream(), (code, msg, data) -> { + if (code != ErrorCode.SUCCESS.getCode()) { + try { + responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); + } + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + dynamicTask.stop(callIdHeader.getCallId()); } - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - dynamicTask.stop(callIdHeader.getCallId()); - } - - - + }); } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { // 平台设置中关闭了拉起离线的推流则直接回复 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 37a95329..74edf5fa 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -374,10 +374,7 @@ public class ZLMHttpHookListener { mediaServerService.removeCount(param.getMediaServerId()); } // 设置拉流代理上线/离线 - int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); - if (updateStatusResult > 0) { - - } + streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); if ("rtp".equals(param.getApp()) && !param.isRegist()) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); @@ -546,14 +543,13 @@ public class ZLMHttpHookListener { if (streamProxyItem.isEnableRemoveNoneReader()) { // 无人观看自动移除 ret.put("close", true); - streamProxyService.del(param.getApp(), param.getStream()); - String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getUrl(); - logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", param.getApp(), param.getStream(), url); + streamProxyService.removeProxy(streamProxyItem.getId()); + logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", param.getApp(), param.getStream(), streamProxyItem.getUrl()); } else if (streamProxyItem.isEnableDisableNoneReader()) { // 无人观看停用 ret.put("close", true); // 修改数据 - streamProxyService.stop(param.getApp(), param.getStream()); + streamProxyService.stop(param.getApp(), param.getStream(), null); } else { // 无人观看不做处理 ret.put("close", false); @@ -682,7 +678,7 @@ public class ZLMHttpHookListener { // 拉流代理 StreamProxy streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { - streamProxyService.start(param.getApp(), param.getStream()); + streamProxyService.start(param.getApp(), param.getStream(), null); } DeferredResult result = new DeferredResult<>(); result.setResult(HookResult.SUCCESS()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index d7a6aed1..4be38e9a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -19,19 +19,6 @@ public interface IStreamProxyService { */ void save(StreamProxy param, GeneralCallback callback); - /** - * 添加视频代理到zlm - * @param param - * @return - */ - JSONObject addStreamProxyToZlm(StreamProxy param); - - /** - * 从zlm移除视频代理 - * @param param - * @return - */ - JSONObject removeStreamProxyFromZlm(StreamProxy param); /** * 分页查询 @@ -41,12 +28,6 @@ public interface IStreamProxyService { */ PageInfo getAll(Integer page, Integer count); - /** - * 删除视频代理 - * @param app - * @param stream - */ - void del(String app, String stream); /** * 启用视频代理 @@ -54,7 +35,7 @@ public interface IStreamProxyService { * @param stream * @return */ - boolean start(String app, String stream); + void start(String app, String stream, GeneralCallback callback); /** * 更新状态 @@ -62,7 +43,7 @@ public interface IStreamProxyService { * @param app * @param stream */ - int updateStatus(boolean status, String app, String stream); + void updateStatus(boolean status, String app, String stream); @@ -72,7 +53,7 @@ public interface IStreamProxyService { * @param stream * @return */ - boolean stop(String app, String stream); + void stop(String app, String stream, GeneralCallback callback); /** * 获取ffmpeg.cmd模板 @@ -101,13 +82,6 @@ public interface IStreamProxyService { */ void zlmServerOffline(String mediaServerId); - void clean(); - - /** - * 更新代理流 - */ - boolean updateStreamProxy(StreamProxy streamProxyItem); - /** * 获取统计信息 * @return @@ -129,7 +103,7 @@ public interface IStreamProxyService { */ void add(StreamProxy param, GeneralCallback callback); - void delById(int gbId); + void removeProxy(int gbId); /** * 编辑拉流代理 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index eb7c3887..4430429b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -70,18 +70,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private ZLMRESTfulUtils zlmresTfulUtils; - @Autowired - private ZLMServerFactory zlmServerFactory; - @Autowired private StreamProxyMapper streamProxyMapper; @Autowired private IRedisCatchStorage redisCatchStorage; - @Autowired - private IVideoManagerStorage storager; - @Autowired private UserSetting userSetting; @@ -183,9 +177,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { param.setEnable(false); // 直接移除 if (param.isEnableRemoveNoneReader()) { - del(param.getApp(), param.getStream()); + delProxyFromDb(param.getApp(), param.getStream()); }else { - updateStreamProxy(param); + updateProxyToDb(param); } } }); @@ -209,30 +203,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { logger.warn("[添加拉流代理] 未找到在线的ZLM..."); throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM"); } - if ("ffmpeg".equalsIgnoreCase(param.getType())) { - if (ObjectUtils.isEmpty(param.getDstUrl())) { - logger.warn("[添加拉流代理] 未设置目标URL地址(DstUrl)"); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址"); - } - logger.info("[添加拉流代理] ffmpeg,源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl()); - if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) { - try { - URL url = new URL(param.getDstUrl()); - String path = url.getPath(); - if (path.indexOf("/", 1) < 0) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径"); - } - String app = path.substring(1, path.indexOf("/", 2)); - String stream = path.substring(path.indexOf("/", 2) + 1); - param.setApp(app); - param.setStream(stream); - } catch (MalformedURLException e) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败"); - } - } - }else { - logger.info("[添加拉流代理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream()); - } + proxyParamHandler(param); param.setMediaServerId(mediaInfo.getId()); StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream()); if (streamProxyInDb != null) { @@ -262,6 +233,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override + @Transactional public void edit(StreamProxy param, GeneralCallback callback) { MediaServerItem mediaInfo; StreamProxy streamProxyInDb = streamProxyMapper.selectOneById(param.getId()); @@ -277,6 +249,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService { logger.warn("[编辑拉流代理] 未找到在线的ZLM..."); throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM"); } + proxyParamHandler(param); + param.setMediaServerId(mediaInfo.getId()); // 国标编号发生变化,修改通用通道国标变化,通用通道应发送删除再发送添加命令通知上级 // 类型变化,启用->启用:需要重新拉起视频流, 启用->未启用: 停止旧的视频流, 未启用->启用:发起新的视频流 // 流地址发生变化。停止旧的,拉起新的 @@ -293,116 +267,39 @@ public class StreamProxyServiceImpl implements IStreamProxyService { // 如果是开启代理这是开启代理结束后的回调 final GeneralCallback startProxyCallback = (code, msg, data) -> { - + if (code == ErrorCode.SUCCESS.getCode()) { + param.setStatus(true); + } else { + param.setStatus(false); + if (param.isEnableRemoveNoneReader()) { + return; + } + param.setProxyError(msg); + } + updateProxyToDb(param); + callback.run(code, msg, null); }; - - if (stopOldProxy) { + if(stopOldProxy) { stopProxy(param, mediaInfo, (code, msg, data) -> { if (code == ErrorCode.SUCCESS.getCode()) { if (param.isEnable()) { startProxy(param, mediaInfo, startProxyCallback); + }else { + callback.run(code, msg, null); } + }else { + callback.run(code, "停止旧的代理失败: " + msg, null); } }); }else { if (param.isEnable()) { startProxy(param, mediaInfo, startProxyCallback); - } - } - - - - - if (ObjectUtils.isEmpty(streamProxyInDb.getGbId())) { - if (!ObjectUtils.isEmpty(param.getGbId())) { - // 之前是空的,现在添加了国标编号 - - } - }else { - if (ObjectUtils.isEmpty(param.getGbId())) { - // 移除了国标编号 }else { - if (!streamProxyInDb.getGbId().equals(param.getGbId())) { - // 修改了国标编号 - } + param.setStatus(false); + updateProxyToDb(param); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); } } - - - - if ("ffmpeg".equalsIgnoreCase(param.getType())) { - if (ObjectUtils.isEmpty(param.getDstUrl())) { - logger.warn("[添加拉流代理] 未设置目标URL地址(DstUrl)"); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址"); - } - logger.info("[添加拉流代理] ffmpeg,源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl()); - if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) { - try { - URL url = new URL(param.getDstUrl()); - String path = url.getPath(); - if (path.indexOf("/", 1) < 0) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径"); - } - String app = path.substring(1, path.indexOf("/", 2)); - String stream = path.substring(path.indexOf("/", 2) + 1); - param.setApp(app); - param.setStream(stream); - } catch (MalformedURLException e) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败"); - } - } - }else { - logger.info("[添加拉流代理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream()); - } - param.setMediaServerId(mediaInfo.getId()); - StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream()); - if (streamProxyInDb != null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在"); - } - if (!param.isEnable()) { - param.setStatus(false); - addProxyToDb(param); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - return; - } - String talkKey = UUID.randomUUID().toString(); - String delayTalkKey = UUID.randomUUID().toString(); - - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { - dynamicTask.stop(talkKey); - param.setStatus(true); - addProxyToDb(param); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }); - - dynamicTask.startDelay(delayTalkKey, ()->{ - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - dynamicTask.stop(talkKey); - callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); - if (param.isEnableRemoveNoneReader()) { - return; - } - param.setProxyError("启用超时"); - param.setStatus(false); - addProxyToDb(param); - }, 10000); - JSONObject jsonObject = addStreamProxyToZlm(param); - if (jsonObject != null && jsonObject.getInteger("code") != 0) { - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - dynamicTask.stop(talkKey); - callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); - if (param.isEnableRemoveNoneReader()) { - return; - } - param.setProxyError("启用失败: " + jsonObject.getString("msg")); - param.setStatus(false); - addProxyToDb(param); - } } public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { @@ -433,6 +330,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { streamProxy.setStatus(true); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); + logger.info("[拉流代理] 启用成功: {}/{}", streamProxy.getApp(), streamProxy.getStream()); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }); @@ -479,6 +377,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService { public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); if (ready) { + if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ + zlmresTfulUtils.delFFmpegSource(mediaInfo, streamProxy.getStreamKey()); + }else { + zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey()); + } mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); } // 检查redis内容是否正确 @@ -492,6 +395,32 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); } + public void proxyParamHandler(StreamProxy param) { + if ("ffmpeg".equalsIgnoreCase(param.getType())) { + if (ObjectUtils.isEmpty(param.getDstUrl())) { + logger.warn("[拉流代理参数处理] 未设置目标URL地址(DstUrl)"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址"); + } + logger.info("[拉流代理参数处理] ffmpeg,源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl()); + if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) { + try { + URL url = new URL(param.getDstUrl()); + String path = url.getPath(); + if (path.indexOf("/", 1) < 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径"); + } + String app = path.substring(1, path.indexOf("/", 2)); + String stream = path.substring(path.indexOf("/", 2) + 1); + param.setApp(app); + param.setStream(stream); + } catch (MalformedURLException e) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败"); + } + } + }else { + logger.info("[拉流代理参数处理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream()); + } + } private void addProxyToDb(StreamProxy param) { // 未启用的数据可以直接保存了 @@ -535,15 +464,19 @@ public class StreamProxyServiceImpl implements IStreamProxyService { param.setCommonGbChannelId(commonGbChannel.getCommonGbId()); } } + if (!param.getGbId().equals(streamProxyInDb.getGbId())) { + CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param); + commonGbChannel.setCommonGbId(streamProxyInDb.getCommonGbChannelId()); + // 国标ID已经改变 + commonGbChannelService.update(commonGbChannel); + } param.setUpdateTime(DateUtil.getNow()); - param.setStatus(streamProxyInDb.isStatus()); - int addStreamProxyResult = streamProxyMapper.add(param); + int addStreamProxyResult = streamProxyMapper.update(param); if (addStreamProxyResult <= 0) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加拉流代理失败"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存拉流代理失败"); } } - private String getSchemaFromFFmpegCmd(String ffmpegCmd) { ffmpegCmd = ffmpegCmd.replaceAll(" + ", " "); String[] paramArray = ffmpegCmd.split(" "); @@ -563,73 +496,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return null; } - /** - * 更新代理流 - * @param streamProxyItem - * @return - */ - @Override - public boolean updateStreamProxy(StreamProxy streamProxyItem) { - streamProxyItem.setCreateTime(DateUtil.getNow()); - return streamProxyMapper.update(streamProxyItem) > 0; - } - - @Override - public JSONObject addStreamProxyToZlm(StreamProxy param) { - JSONObject result = null; - MediaServerItem mediaServerItem = null; - if (param.getMediaServerId() == null) { - logger.warn("添加代理时MediaServerId 为null"); - return null; - }else { - mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); - } - if (mediaServerItem == null) { - return null; - } - if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) { - zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream()); - } - if ("ffmpeg".equalsIgnoreCase(param.getType())){ - result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getUrl().trim(), param.getDstUrl(), - param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(), - param.getFfmpegCmdKey()); - }else { - result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(), - param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); - } - if (result != null && result.getInteger("code") == 0) { - JSONObject data = result.getJSONObject("data"); - if (data == null) { - logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result ); - return result; - } - String key = data.getString("key"); - if (key == null) { - logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result ); - return result; - } - param.setStreamKey(key); - streamProxyMapper.update(param); - } - return result; - } - - @Override - public JSONObject removeStreamProxyFromZlm(StreamProxy param) { - if (param ==null) { - return null; - } - MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); - JSONObject result = null; - if ("ffmpeg".equalsIgnoreCase(param.getType())){ - result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey()); - }else { - result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey()); - } - return result; - } - @Override public PageInfo getAll(Integer page, Integer count) { PageHelper.startPage(page, count); @@ -637,8 +503,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return new PageInfo<>(all); } - @Override - public void del(String app, String stream) { + private void delProxyFromDb(String app, String stream) { StreamProxy streamProxyItem = streamProxyMapper.selectOne(app, stream); if (streamProxyItem == null) { return; @@ -648,70 +513,80 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } streamProxyMapper.delById(streamProxyItem.getId()); redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream); - - JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream); - }else { - logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", app, stream); - } } @Override - public void delById(int id) { - StreamProxy streamProxyItem = streamProxyMapper.selectOneById(id); - if (streamProxyItem == null) { + public void removeProxy(int id) { + StreamProxy streamProxy = streamProxyMapper.selectOneById(id); + if (streamProxy == null) { return; } - if (streamProxyItem.getCommonGbChannelId() > 0) { - commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId()); - } - streamProxyMapper.delById(id); - redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", streamProxyItem.getApp(), streamProxyItem.getStream()); - - JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", streamProxyItem.getApp(), streamProxyItem.getStream()); + if (streamProxy.isEnable()) { + String mediaServerId = streamProxy.getMediaServerId(); + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem != null) { + boolean ready = mediaService.isReady(mediaServerItem, streamProxy.getApp(), streamProxy.getStream()); + if (ready) { + stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> { + if (code == ErrorCode.SUCCESS.getCode()) { + logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", streamProxy.getApp(), streamProxy.getStream()); + }else { + logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", streamProxy.getApp(), streamProxy.getStream()); + } + }); + } + } }else { - logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", streamProxyItem.getApp(), streamProxyItem.getStream()); + delProxyFromDb(streamProxy.getApp(), streamProxy.getStream()); } } @Override - public boolean start(String app, String stream) { - boolean result = false; + @Transactional + public void start(String app, String stream, GeneralCallback callback) { StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); - if (streamProxy != null && !streamProxy.isEnable() ) { - JSONObject jsonObject = addStreamProxyToZlm(streamProxy); - if (jsonObject == null) { - return false; - } - if (jsonObject.getInteger("code") == 0) { - result = true; - streamProxy.setEnable(true); - updateStreamProxy(streamProxy); - }else { - logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"), - streamProxy.getUrl() == null? streamProxy.getUrl():streamProxy.getUrl()); - } - } else if (streamProxy != null && streamProxy.isEnable()) { - return true ; + if (streamProxy == null || !streamProxy.isEnable()){ + callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null); + return; } - return result; + String mediaServerId = streamProxy.getMediaServerId(); + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null); + return; + } + startProxy(streamProxy, mediaServerItem, (code, msg, data) -> { + if (code == ErrorCode.SUCCESS.getCode()) { + streamProxy.setStatus(true); + }else { + streamProxy.setStatus(false); + } + streamProxy.setUpdateTime(DateUtil.getNow()); + updateProxyToDb(streamProxy); + callback.run(code, msg, data); + }); } @Override - public boolean stop(String app, String stream) { - boolean result = false; - StreamProxy streamProxyDto = streamProxyMapper.selectOne(app, stream); - if (streamProxyDto != null && streamProxyDto.isEnable()) { - JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto); - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - streamProxyDto.setEnable(false); - result = updateStreamProxy(streamProxyDto); - } + @Transactional + public void stop(String app, String stream, GeneralCallback callback) { + StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); + if (streamProxy == null || !streamProxy.isEnable()){ + callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null); + return; } - return result; + String mediaServerId = streamProxy.getMediaServerId(); + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null); + return; + } + stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> { + streamProxy.setStatus(false); + streamProxy.setUpdateTime(DateUtil.getNow()); + updateProxyToDb(streamProxy); + callback.run(code, msg, data); + }); } @Override @@ -719,7 +594,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { JSONObject result = new JSONObject(); JSONObject mediaServerConfigResuly = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); if (mediaServerConfigResuly != null && mediaServerConfigResuly.getInteger("code") == 0 - && mediaServerConfigResuly.getJSONArray("data").size() > 0){ + && !mediaServerConfigResuly.getJSONArray("data").isEmpty()){ JSONObject mediaServerConfig = mediaServerConfigResuly.getJSONArray("data").getJSONObject(0); for (String key : mediaServerConfig.keySet()) { @@ -738,6 +613,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override + @Transactional public void zlmServerOnline(String mediaServerId) { // 移除开启了无人观看自动移除的流 List streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId); @@ -762,14 +638,34 @@ public class StreamProxyServiceImpl implements IStreamProxyService { mediaServerId, true); for (StreamProxy streamProxyDto : streamProxyListForEnable) { logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); - JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto); - if (jsonObject == null) { - // 设置为离线 - logger.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); - updateStatus(false, streamProxyDto.getApp(), streamProxyDto.getStream()); + MediaServerItem mediaServerItem = mediaServerService.getOne(streamProxyDto.getMediaServerId()); + startProxy(streamProxyDto, mediaServerItem, (code, msg, data) -> { + if (code == ErrorCode.ERROR100.getCode()) { + if (!streamProxyDto.isStatus()) { + updateStatusById(streamProxyDto, true); + } + } else { + if (streamProxyDto.isStatus()) { + updateStatusById(streamProxyDto, false); + } + } + + }); + } + } + + @Transactional + public void updateStatusById(StreamProxy streamProxy, boolean status) { + streamProxyMapper.updateStatusById(streamProxy.getId(), status); + if (streamProxy.getCommonGbChannelId() > 0) { + List ids = new ArrayList<>(); + ids.add(streamProxy.getCommonGbChannelId()); + if (status) { + commonGbChannelService.onlineForList(ids); }else { - updateStatus(true, streamProxyDto.getApp(), streamProxyDto.getStream()); + commonGbChannelService.offlineForList(ids); } + } } @@ -811,13 +707,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override - public void clean() { - - } - - @Override - public int updateStatus(boolean status, String app, String stream) { - return streamProxyMapper.updateStatus(app, stream, status); + @Transactional + public void updateStatus(boolean status, String app, String stream) { + StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); + if (streamProxy == null) { + return; + } + if (streamProxy.getCommonGbChannelId() > 0) { + updateStatusById(streamProxy, status); + } } private void syncPullStream(String mediaServerId){ @@ -843,7 +741,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } } - if (stringStreamInfoMap.size() == 0) { + if (stringStreamInfoMap.isEmpty()) { redisCatchStorage.removeStream(mediaServerId, "PULL"); }else { for (String key : stringStreamInfoMap.keySet()) { @@ -875,40 +773,40 @@ public class StreamProxyServiceImpl implements IStreamProxyService { * 检查拉流代理状态 */ @Scheduled(cron = "* 0/10 * * * ?") + @Transactional public void asyncCheckStreamProxyStatus() { List all = mediaServerService.getAllOnline(); - if (CollectionUtils.isEmpty(all)){ return; } - - Map serverItemMap = all.stream().collect(Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1)); - + Map serverItemMap = all.stream().collect( + Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1)); List list = getAllForEnable(); if (CollectionUtils.isEmpty(list)){ return; } - for (StreamProxy streamProxyItem : list) { - MediaServerItem mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId()); - - // TODO 支持其他 schema - JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp"); - + JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), + streamProxyItem.getStream(), "rtsp"); if (mediaInfo == null){ - streamProxyItem.setStatus(false); + if (streamProxyItem.isStatus()) { + updateStatusById(streamProxyItem, false); + } } else { if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) { - streamProxyItem.setStatus(true); + if (!streamProxyItem.isStatus()) { + updateStatusById(streamProxyItem, true); + } } else { - streamProxyItem.setStatus(false); + if (streamProxyItem.isStatus()) { + updateStatusById(streamProxyItem, false); + } } } - updateStreamProxy(streamProxyItem); } } @@ -921,4 +819,5 @@ public class StreamProxyServiceImpl implements IStreamProxyService { public List getAllForEnable() { return streamProxyMapper.selectForEnable(true); } + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index c2d47506..f712e8ab 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -75,6 +75,11 @@ public interface StreamProxyMapper { "WHERE app=#{app} AND stream=#{stream}") int updateStatus(@Param("app") String app, @Param("stream") String stream, @Param("status") boolean status); + @Update("UPDATE wvp_stream_proxy " + + "SET status=#{status} " + + "WHERE id=#{id}") + int updateStatusById(@Param("id") int id, @Param("status") boolean status); + @Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}") void deleteAutoRemoveItemByMediaServerId(String mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java index 56d192e9..d44b7dcb 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java @@ -94,8 +94,8 @@ public class MediaController { return new StreamContent(streamInfo); }else { //获取流失败,重启拉流后重试一次 - streamProxyService.stop(app,stream); - boolean start = streamProxyService.start(app, stream); + streamProxyService.stop(app,stream, null); + streamProxyService.start(app, stream, null); try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index c7443986..d65ef195 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -97,7 +97,7 @@ public class StreamProxyController { } StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyItem != null) { - streamProxyService.del(param.getApp(), param.getStream()); + streamProxyService.removeProxy(streamProxyItem.getId()); } DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); @@ -226,7 +226,11 @@ public class StreamProxyController { if (app == null || stream == null) { throw new ControllerException(ErrorCode.ERROR400.getCode(), app == null ?"app不能为null":"stream不能为null"); }else { - streamProxyService.del(app, stream); + StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app,stream); + if (streamProxyItem == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在"); + } + streamProxyService.removeProxy(streamProxyItem.getId()); } } @@ -240,7 +244,7 @@ public class StreamProxyController { if (proxy.getId() <= 0) { throw new ControllerException(ErrorCode.ERROR400.getCode(), "缺少ID"); }else { - streamProxyService.delById(proxy.getId()); + streamProxyService.removeProxy(proxy.getId()); } } @@ -249,12 +253,14 @@ public class StreamProxyController { @Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "app", description = "应用名", required = true) @Parameter(name = "stream", description = "流id", required = true) - public void start(String app, String stream){ + public DeferredResult start(String app, String stream){ logger.info("启用代理: " + app + "/" + stream); - boolean result = streamProxyService.start(app, stream); - if (!result) { - throw new ControllerException(ErrorCode.ERROR100); - } + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + streamProxyService.start(app, stream, (code, msg, data) -> { + WVPResult wvpResult = new WVPResult<>(code, msg, null); + result.setResult(wvpResult); + }); + return result; } @GetMapping(value = "/stop") @@ -262,12 +268,13 @@ public class StreamProxyController { @Operation(summary = "停用代理", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "app", description = "应用名", required = true) @Parameter(name = "stream", description = "流id", required = true) - public void stop(String app, String stream){ + public DeferredResult stop(String app, String stream){ logger.info("停用代理: " + app + "/" + stream); - boolean result = streamProxyService.stop(app, stream); - if (!result) { - logger.info("停用代理失败: " + app + "/" + stream); - throw new ControllerException(ErrorCode.ERROR100); - } + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + streamProxyService.stop(app, stream, (code, msg, data) -> { + WVPResult wvpResult = new WVPResult<>(code, msg, null); + result.setResult(wvpResult); + }); + return result; } }