From 55a240bb4504baa9a75e44bc6cc597c96b80705d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 18 Apr 2024 15:52:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SendRtpItem.java | 21 +++- .../request/impl/AckRequestProcessor.java | 4 +- .../request/impl/ByeRequestProcessor.java | 2 +- .../request/impl/InviteRequestProcessor.java | 41 +++++-- .../vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../service/redisMsg/IRedisRpcService.java | 12 +-- .../redisMsg/control/RedisRpcController.java | 100 +++++++++++++----- .../redisMsg/service/RedisRpcServiceImpl.java | 58 +++++++--- .../storager/impl/RedisCatchStorageImpl.java | 14 +-- src/main/resources/application.yml | 2 +- 10 files changed, 176 insertions(+), 80 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index f1744d1e..65c33fef 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.common.VideoManagerConstants; + public class SendRtpItem { /** @@ -89,7 +91,7 @@ public class SendRtpItem { /** * invite 的 callId */ - private String CallId; + private String callId; /** * invite 的 fromTag @@ -242,11 +244,11 @@ public class SendRtpItem { } public String getCallId() { - return CallId; + return callId; } public void setCallId(String callId) { - CallId = callId; + this.callId = callId; } public InviteStreamType getPlayType() { @@ -364,7 +366,7 @@ public class SendRtpItem { ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + - ", CallId='" + CallId + '\'' + + ", CallId='" + callId + '\'' + ", fromTag='" + fromTag + '\'' + ", toTag='" + toTag + '\'' + ", pt=" + pt + @@ -376,4 +378,15 @@ public class SendRtpItem { ", sessionName='" + sessionName + '\'' + '}'; } + + public String getRedisKey() { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + serverId + "_" + + mediaServerId + "_" + + platformId + "_" + + channelId + "_" + + stream + "_" + + callId; + return key; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 383e0427..fabac364 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -96,7 +96,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In logger.info("[收到ACK]: 来自->{}", fromUserId); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); if (sendRtpItem == null) { - logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId); + logger.warn("[收到ACK]:未找到来自{},callId: {}", fromUserId, callIdHeader.getCallId()); return; } // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤 @@ -117,7 +117,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (parentPlatform != null) { Map param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem); + WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem); if (wvpResult.getCode() == 0) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 805173a0..5b8bad43 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -137,7 +137,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (platform != null) { redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - redisRpcService.stopSendRtp(sendRtpItem); + redisRpcService.stopSendRtp(sendRtpItem.getRedisKey()); redisCatchStorage.deleteSendRTPServer(null, null, sendRtpItem.getCallId(), null); }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 51c5b697..ff84fc43 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -44,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.sdp.*; @@ -85,6 +86,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IRedisRpcService redisRpcService; + @Autowired + private RedisTemplate redisTemplate; + @Autowired private SSRCFactory ssrcFactory; @@ -604,6 +608,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); + redisCatchStorage.updateSendRTPSever(sendRtpItem); // 开始推流 sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { @@ -766,7 +771,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 设置超时 dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { - redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.REQUEST_TIMEOUT); // 超时 @@ -775,8 +780,27 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } }, userSetting.getPlatformPlayTimeout()); // - redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> { + redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { dynamicTask.stop(sendRtpItem.getCallId()); + if (sendRtpItemKey == null) { + logger.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("未处理的异常 ", e); + } + return; + } + SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItemFromRedis == null) { + logger.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("未处理的异常 ", e); + } + return; + } if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { logger.info("[级联点播] 等待的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -784,11 +808,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); try { responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("未处理的异常 ", e); - } catch (InvalidArgumentException e) { - logger.error("未处理的异常 ", e); - } catch (ParseException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } return; @@ -814,7 +834,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { dynamicTask.stop(sendRtpItem.getCallId()); - redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -831,7 +851,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements */ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { logger.info("[级联点播] 来自其他wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem); + sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey()); + if (sendRtpItem == null) { + return; + } // 写入redis, 超时时回复 sendRtpItem.setStatus(1); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index b68770e3..4d295329 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -539,7 +539,7 @@ public class ZLMHttpHookListener { } }else { // 通知其他wvp停止发流 - redisRpcService.rtpSendStopped(sendRtpItem); + redisRpcService.rtpSendStopped(sendRtpItem.getRedisKey()); } } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 8d1b7f08..70d53bc8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -6,16 +6,16 @@ import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface IRedisRpcService { - SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem); + SendRtpItem getSendRtpItem(String sendRtpItemKey); - WVPResult startSendRtp(SendRtpItem sendRtpItem); + WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem); - void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback); + WVPResult stopSendRtp(String sendRtpItemKey); - WVPResult stopSendRtp(SendRtpItem sendRtpItem); + void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback); - void stopWaitePushStreamOnline(SendRtpItem sendRtpItem); + void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem); - void rtpSendStopped(SendRtpItem sendRtpItem); + void rtpSendStopped(String sendRtpItemKey); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 69e942fd..b61615bd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.redisMsg.control; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; @@ -21,6 +20,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +78,14 @@ public class RedisRpcController { * 获取发流的信息 */ public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; + } logger.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); // 查询本级是否有这个流 MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -103,9 +110,10 @@ public class RedisRpcController { sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); + redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); - response.setBody(sendRtpItem); + response.setBody(sendRtpItemKey); return response; } @@ -113,14 +121,25 @@ public class RedisRpcController { * 监听流上线 */ public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); logger.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); // 查询本级是否有这个流 MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); if (mediaServerItem != null) { logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + // 读取redis中的上级点播信息,生成sendRtpItm发送出去 + if (sendRtpItem.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); RedisRpcResponse response = request.getResponse(); - response.setBody(sendRtpItem); + response.setBody(sendRtpItem.getRedisKey()); response.setStatusCode(200); } // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 @@ -139,8 +158,9 @@ public class RedisRpcController { sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); RedisRpcResponse response = request.getResponse(); - response.setBody(sendRtpItem); + response.setBody(sendRtpItem.getRedisKey()); response.setStatusCode(200); // 手动发送结果 sendResponse(response); @@ -153,7 +173,14 @@ public class RedisRpcController { * 停止监听流上线 */ public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 停止监听流上线, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; + } logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 @@ -168,24 +195,33 @@ public class RedisRpcController { * 开始发流 */ public RedisRpcResponse startSendRtp(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); + response.setBody(wvpResult); + return response; + } logger.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); + response.setBody(wvpResult); + return response; } Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (!streamReady) { logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线"); + response.setBody(wvpResult); + return response; } JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); if (jsonObject.getInteger("code") == 0) { logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); WVPResult wvpResult = WVPResult.success(); @@ -202,43 +238,51 @@ public class RedisRpcController { * 停止发流 */ public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItem == null) { + logger.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); + response.setBody(wvpResult); + return response; + } logger.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); + response.setBody(wvpResult); + return response; } JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); if (jsonObject.getInteger("code") == 0) { logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); - WVPResult wvpResult = WVPResult.success(); - response.setBody(wvpResult); + response.setBody(WVPResult.success()); + return response; }else { int code = jsonObject.getInteger("code"); String msg = jsonObject.getString("msg"); logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg ); - WVPResult wvpResult = WVPResult.fail(code, msg); - response.setBody(wvpResult); + response.setBody(WVPResult.fail(code, msg)); + return response; } - return response; } /** * 其他wvp通知推流已经停止了 */ public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) { - SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); - logger.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); - SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId()); + String sendRtpItemKey = request.getParam().toString(); + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); - if (sendRtpItemInCatch == null) { + if (sendRtpItem == null) { + logger.info("[redis-rpc] 推流已经停止, 未找到redis中的发流信息, key:{}", sendRtpItemKey); return response; } + logger.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); if (platform == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 9dcfadf2..f4c429c2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -14,10 +14,12 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; @Service @@ -37,6 +39,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService { @Autowired private SSRCFactory ssrcFactory; + @Autowired + private RedisTemplate redisTemplate; + private RedisRpcRequest buildRequest(String uri, Object param) { RedisRpcRequest request = new RedisRpcRequest(); request.setFromId(userSetting.getServerId()); @@ -46,32 +51,40 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) { - - RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem); + public SendRtpItem getSendRtpItem(String sendRtpItemKey) { + RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey); RedisRpcResponse response = redisRpcConfig.request(request, 10); - return JSON.parseObject(response.getBody().toString(), SendRtpItem.class); + if (response.getBody() == null) { + return null; + } + return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString()); } @Override - public WVPResult startSendRtp(SendRtpItem sendRtpItem) { + public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) { logger.info("[请求其他WVP] 开始推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); - RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem); + RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override - public WVPResult stopSendRtp(SendRtpItem sendRtpItem) { - RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem); + public WVPResult stopSendRtp(String sendRtpItemKey) { + SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息"); + } + logger.info("[请求其他WVP] 停止推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override - public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback) { + public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback) { logger.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( @@ -87,36 +100,47 @@ public class RedisRpcServiceImpl implements IRedisRpcService { sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); if (callback != null) { - callback.run(sendRtpItem); + callback.run(sendRtpItem.getRedisKey()); } hookSubscribe.removeSubscribe(hook); }); RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, response -> { - SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class); - logger.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItemFromOther.getApp(), sendRtpItemFromOther.getStream(), sendRtpItemFromOther); + if (response.getBody() == null) { + logger.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + return; + } + logger.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString()); + if (callback != null) { - callback.run(sendRtpItemFromOther); + callback.run(response.getBody().toString()); } }); } @Override - public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) { + public void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem) { + logger.info("[停止WVP监听流上线] {}/{}, key:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItemKey); HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); hookSubscribe.removeSubscribe(hook); - RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem); + RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } @Override - public void rtpSendStopped(SendRtpItem sendRtpItem) { - RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItem); + public void rtpSendStopped(String sendRtpItemKey) { + SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItem == null) { + logger.info("[停止WVP监听流上线] 未找到redis中的发流信息, key:{}", sendRtpItemKey); + return; + } + RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 97e45732..b2909eee 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -141,15 +141,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void updateSendRTPSever(SendRtpItem sendRtpItem) { - - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + - userSetting.getServerId() + "_" - + sendRtpItem.getMediaServerId() + "_" - + sendRtpItem.getPlatformId() + "_" - + sendRtpItem.getChannelId() + "_" - + sendRtpItem.getStream() + "_" - + sendRtpItem.getCallId(); - redisTemplate.opsForValue().set(key, sendRtpItem); + redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); } @Override @@ -186,7 +178,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { callId = "*"; } String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" + + "*_*_" + platformGbId + "_" + channelId + "_" + streamId + "_" @@ -292,7 +284,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { */ @Override public void deleteSendRTPServer(SendRtpItem sendRtpItem) { - deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getServerId()); + deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream()); } @Override diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3f478442..69f947e2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ spring: application: name: wvp profiles: - active: local \ No newline at end of file + active: local2 \ No newline at end of file