diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 89384688..8e2f5038 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -771,7 +771,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 设置超时 dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { - redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.REQUEST_TIMEOUT); // 超时 @@ -834,7 +834,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { dynamicTask.stop(sendRtpItem.getCallId()); - redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); redisRpcService.removeCallback(key); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index a50ddf09..b4bd72cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -14,7 +14,7 @@ public interface IRedisRpcService { long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback); - void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem); + void stopWaitePushStreamOnline(SendRtpItem sendRtpItem); void rtpSendStopped(String sendRtpItemKey); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 0d01c946..5a651b3b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -174,21 +174,15 @@ public class RedisRpcController { * 停止监听流上线 */ public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) { - String sendRtpItemKey = request.getParam().toString(); - SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey); - if (sendRtpItem == null) { - logger.info("[redis-rpc] 停止监听流上线, 未找到redis中的发流信息, key:{}", sendRtpItemKey); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); - return response; - } - logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); - + SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); hookSubscribe.removeSubscribe(hook); - return null; + logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 9b0a195c..8ffb5621 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -14,19 +14,14 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; -import com.genersoft.iot.vmp.utils.SystemInfoUtils; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import java.util.List; -import java.util.Map; - @Service public class RedisRpcServiceImpl implements IRedisRpcService { @@ -131,12 +126,12 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem) { - logger.info("[停止WVP监听流上线] {}/{}, key:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItemKey); + public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) { + logger.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); hookSubscribe.removeSubscribe(hook); - RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItemKey); + RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); }