From 9aad6ac7194f94e75823328e827d11596a6487d0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 8 Aug 2024 17:47:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/common/StreamInfo.java | 1 + .../media/service/IMediaServerService.java | 2 + .../service/impl/MediaServerServiceImpl.java | 13 ++ .../service/redisMsg/IRedisRpcService.java | 4 + .../redisMsg/control/RedisRpcController.java | 48 ++++++ .../redisMsg/service/RedisRpcServiceImpl.java | 50 ++++++ .../controller/StreamPushController.java | 6 +- .../service/IStreamPushPlayService.java | 4 +- .../impl/StreamPushPlayServiceImpl.java | 163 ++++-------------- 9 files changed, 159 insertions(+), 132 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java index 96a987c9..7a2e03cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java @@ -7,6 +7,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import java.io.Serializable; import java.util.Objects; + @Schema(description = "流信息") public class StreamInfo implements Serializable, Cloneable{ diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 92b2946a..1a743b2f 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -158,4 +158,6 @@ public interface IMediaServerService { StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy); void stopProxy(MediaServer mediaServer, String streamKey); + + StreamInfo getMediaByAppAndStream(String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index f484a3c5..791c3420 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -909,6 +909,19 @@ public class MediaServerServiceImpl implements IMediaServerService { return null; } + @Override + public StreamInfo getMediaByAppAndStream(String app, String stream) { + + List mediaServerList = getAll(); + for (MediaServer mediaServer : mediaServerList) { + MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream); + if (mediaInfo != null) { + return getStreamInfoByAppAndStream(mediaServer, app, stream, mediaInfo, mediaInfo.getCallId()); + } + } + return null; + } + @Override public Long updateDownloadProcess(MediaServer mediaServer, String app, String stream) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index b4bd72cf..29e4759f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -19,4 +20,7 @@ public interface IRedisRpcService { void rtpSendStopped(String sendRtpItemKey); void removeCallback(long key); + + long onStreamOnlineEvent(String app, String stream, CommonCallback callback); + void unPushStreamOnlineEvent(String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 3ba96e37..280904d8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.redisMsg.control; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; @@ -162,6 +163,39 @@ public class RedisRpcController { return null; } + /** + * 监听流上线 + */ + public RedisRpcResponse onPushStreamOnlineEvent(RedisRpcRequest request) { + StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class); + log.info("[redis-rpc] 监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream()); + // 查询本级是否有这个流 + StreamInfo streamInfoInServer = mediaServerService.getMediaByAppAndStream(streamInfo.getApp(), streamInfo.getStream()); + if (streamInfoInServer != null) { + log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", streamInfo.getApp(), streamInfo.getStream()); + RedisRpcResponse response = request.getResponse(); + response.setBody(streamInfoInServer); + response.setStatusCode(200); + return response; + } + // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 + Hook hook = Hook.getInstance(HookType.on_media_arrival, streamInfo.getApp(), streamInfo.getStream(), null); + hookSubscribe.addSubscribe(hook, (hookData) -> { + log.info("[redis-rpc] 监听流上线,流已上线: {}/{}", streamInfo.getApp(), streamInfo.getStream()); + // 读取redis中的上级点播信息,生成sendRtpItm发送出去 + RedisRpcResponse response = request.getResponse(); + response.setBody(mediaServerService.getStreamInfoByAppAndStream(hookData.getMediaServer(), + streamInfo.getApp(), streamInfo.getStream(), hookData.getMediaInfo(), + hookData.getMediaInfo() != null ? hookData.getMediaInfo().getCallId() : null)); + response.setStatusCode(200); + // 手动发送结果 + sendResponse(response); + hookSubscribe.removeSubscribe(hook); + + }); + return null; + } + /** * 停止监听流上线 */ @@ -176,6 +210,20 @@ public class RedisRpcController { return response; } + /** + * 停止监听流上线 + */ + public RedisRpcResponse unPushStreamOnlineEvent(RedisRpcRequest request) { + StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class); + log.info("[redis-rpc] 停止监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream()); + // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 + Hook hook = Hook.getInstance(HookType.on_media_arrival, streamInfo.getApp(), streamInfo.getStream(), null); + hookSubscribe.removeSubscribe(hook); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; + } + /** * 开始发流 diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 011c1327..46838e1f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg.service; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; @@ -11,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -38,6 +40,10 @@ public class RedisRpcServiceImpl implements IRedisRpcService { @Autowired private RedisTemplate redisTemplate; + + @Autowired + private IMediaServerService mediaServerService; + private RedisRpcRequest buildRequest(String uri, Object param) { RedisRpcRequest request = new RedisRpcRequest(); request.setFromId(userSetting.getServerId()); @@ -146,4 +152,48 @@ public class RedisRpcServiceImpl implements IRedisRpcService { public void removeCallback(long key) { redisRpcConfig.removeCallback(key); } + + @Override + public long onStreamOnlineEvent(String app, String stream, CommonCallback callback) { + + log.info("[请求所有WVP监听流上线] {}/{}", app, stream); + // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 + Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, null); + StreamInfo streamInfoParam = new StreamInfo(); + streamInfoParam.setApp(app); + streamInfoParam.setStream(stream); + RedisRpcRequest request = buildRequest("onPushStreamOnlineEvent", streamInfoParam); + hookSubscribe.addSubscribe(hook, (hookData) -> { + if (callback != null) { + callback.run(mediaServerService.getStreamInfoByAppAndStream(hookData.getMediaServer(), + app, stream, hookData.getMediaInfo(), + hookData.getMediaInfo() != null ? hookData.getMediaInfo().getCallId() : null)); + } + hookSubscribe.removeSubscribe(hook); + redisRpcConfig.removeCallback(request.getSn()); + }); + + redisRpcConfig.request(request, response -> { + if (response.getBody() == null) { + log.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", app, stream); + return; + } + log.info("[请求所有WVP监听流上线] 流上线 {}/{}", app, stream); + + if (callback != null) { + callback.run((StreamInfo) response.getBody()); + } + hookSubscribe.removeSubscribe(hook); + }); + return request.getSn(); + } + + @Override + public void unPushStreamOnlineEvent(String app, String stream) { + StreamInfo streamInfoParam = new StreamInfo(); + streamInfoParam.setApp(app); + streamInfoParam.setStream(stream); + RedisRpcRequest request = buildRequest("unPushStreamOnlineEvent", streamInfoParam); + redisRpcConfig.request(request, 10); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java index ad350245..3af1f294 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java @@ -251,12 +251,12 @@ public class StreamPushController { WVPResult fail = WVPResult.fail(ErrorCode.ERROR100.getCode(), "等待推流超时"); result.setResult(fail); }); - streamPushPlayService.start(id, streamInfo -> { - if (streamInfo != null) { + streamPushPlayService.start(id, (code, msg, streamInfo) -> { + if (code == 0 && streamInfo != null) { WVPResult success = WVPResult.success(new StreamContent(streamInfo)); result.setResult(success); } - }); + }, null, null); return result; } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java index 0157abbd..1881c19b 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java @@ -1,8 +1,8 @@ package com.genersoft.iot.vmp.streamPush.service; -import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; public interface IStreamPushPlayService { - void start(Integer id, CommonCallback callback); + void start(Integer id, ErrorCallback callback, String platformDeviceId, String platformName ); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java index 47703230..b590ed79 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -1,40 +1,29 @@ package com.genersoft.iot.vmp.streamPush.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.common.CommonCallback; -import com.genersoft.iot.vmp.common.InviteInfo; -import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.dao.StreamPushMapper; import com.genersoft.iot.vmp.streamPush.service.IStreamPushPlayService; -import gov.nist.javax.sip.message.SIPResponse; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.ObjectUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.Assert; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import javax.sip.message.Response; -import java.text.ParseException; import java.util.UUID; @Service @@ -60,54 +49,22 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { @Autowired private HookSubscribe subscribe; + @Autowired + private IRedisRpcService redisRpcService; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private RedisPushStreamResponseListener redisPushStreamResponseListener; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + @Override - public void start(Integer id, CommonCallback callback) { + public void start(Integer id, ErrorCallback callback, String platformDeviceId, String platformName ) { StreamPush streamPush = streamPushMapper.queryOne(id); Assert.notNull(streamPush, "推流信息未找到"); -// if (streamPush.isPushing() && streamPush.getMediaServerId() != null) { -// // 检查流是否准备就绪 -// MediaServer mediaServer = mediaServerService.getOne(streamPush.getMediaServerId()); -// if (mediaServer != null) { -// Boolean streamReady = mediaServerService.isStreamReady(mediaServer, streamPush.getApp(), streamPush.getStream()); -// if (streamReady != null && streamReady) { -// String callId = null; -// StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(streamPush.getApp(), streamPush.getStream()); -// if (streamAuthorityInfo != null) { -// callId = streamAuthorityInfo.getCallId(); -// } -// callback.run(mediaServerService.getStreamInfoByAppAndStream(mediaServer, -// streamPush.getApp(), streamPush.getStream(), null, callId)); -// return; -// } -// } -// } -// Assert.isTrue(streamPush.isAutoPushChannel(), "通道未推流"); -// // 发送redis消息,通知流上线 -// String timeOutTaskKey = UUID.randomUUID().toString(); -// Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamPush.getApp(), streamPush.getStream(), null); -// // 开启流上线监听 -// subscribe.addSubscribe(rtpHook, (hookData) -> { -// dynamicTask.stop(timeOutTaskKey); -// subscribe.removeSubscribe(rtpHook); -// if (hookData == null) { -// return; -// } -// String callId = null; -// StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(streamPush.getApp(), streamPush.getStream()); -// if (streamAuthorityInfo != null) { -// callId = streamAuthorityInfo.getCallId(); -// } -// callback.run(mediaServerService.getStreamInfoByAppAndStream(hookData.getMediaServer(), -// streamPush.getApp(), streamPush.getStream(), null, callId)); -// }); -// // 设置超时事件 -// dynamicTask.startDelay(timeOutTaskKey, () -> { -// // 取消流监听 -// subscribe.removeSubscribe(rtpHook); -// }, userSetting.getPlayTimeout()); - // 发送redis消息, 同时监听可能返回的拒绝消息 - - MediaArrivalEvent pushListItem = redisCatchStorage.getPushListItem(streamPush.getApp(), streamPush.getStream()); if (pushListItem != null) { String callId = null; @@ -115,7 +72,7 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { if (streamAuthorityInfo != null) { callId = streamAuthorityInfo.getCallId(); } - callback.run(mediaServerService.getStreamInfoByAppAndStream(pushListItem.getMediaServer(), + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), mediaServerService.getStreamInfoByAppAndStream(pushListItem.getMediaServer(), streamPush.getApp(), streamPush.getStream(), null, callId)); return; } @@ -123,84 +80,36 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { // 发送redis消息以使设备上线,流上线后被 log.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", streamPush.getApp(), streamPush.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - streamPush.getApp(), streamPush.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), - platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + streamPush.getApp(), streamPush.getStream(), streamPush.getGbDeviceId(), platformDeviceId, + platformName, userSetting.getServerId(), null); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 设置超时 - dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { - redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + String timeOutTaskKey = UUID.randomUUID().toString(); + dynamicTask.startDelay(timeOutTaskKey, () -> { + redisRpcService.unPushStreamOnlineEvent(streamPush.getApp(), streamPush.getStream()); log.info("[ app={}, stream={} ] 等待设备开始推流超时", streamPush.getApp(), streamPush.getStream()); - try { - responseAck(request, Response.REQUEST_TIMEOUT); // 超时 - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } + callback.run(ErrorCode.ERROR100.getCode(), "timeout", null); + }, userSetting.getPlatformPlayTimeout()); // - long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { - dynamicTask.stop(sendRtpItem.getCallId()); - if (sendRtpItemKey == null) { + long key = redisRpcService.onStreamOnlineEvent(streamPush.getApp(), streamPush.getStream(), (streamInfo) -> { + dynamicTask.stop(timeOutTaskKey); + if (streamInfo == null) { log.warn("[级联点播] 等待推流得到结果未空: {}/{}", streamPush.getApp(), streamPush.getStream()); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } - return; - } - SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); - if (sendRtpItemFromRedis == null) { - log.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息: {}/{}", streamPush.getApp(), streamPush.getStream()); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } - return; - } - if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { - log.info("[级联点播] 等待的推流在本平台上线 {}/{}", streamPush.getApp(), streamPush.getStream()); - int localPort = sendRtpPortManager.getNextPort(mediaServerItem); - if (localPort == 0) { - log.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("未处理的异常 ", e); - } - return; - } - sendRtpItem.setLocalPort(localPort); - if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { - sendRtpItem.setLocalIp(platform.getSendStreamIp()); - } - - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - SIPResponse response = sendStreamAck(request, sendRtpItem, platform); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - // 其他平台内容 - otherWvpPushStream(sendRtpItemFromRedis, request, platform); + callback.run(ErrorCode.ERROR100.getCode(), "fail", null); + }else { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } }); // 添加回复的拒绝或者错误的通知 // redis消息例如: PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"失败","app":"1","stream":"2"}' redisPushStreamResponseListener.addEvent(streamPush.getApp(), streamPush.getStream(), response -> { if (response.getCode() != 0) { - dynamicTask.stop(sendRtpItem.getCallId()); - redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + dynamicTask.stop(timeOutTaskKey); + redisRpcService.unPushStreamOnlineEvent(streamPush.getApp(), streamPush.getStream()); redisRpcService.removeCallback(key); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage()); - } + callback.run(response.getCode(), response.getMsg(), null); } }); - } }