diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index ff84fc43..89384688 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -780,7 +780,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } }, userSetting.getPlatformPlayTimeout()); // - redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { + long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { dynamicTask.stop(sendRtpItem.getCallId()); if (sendRtpItemKey == null) { logger.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -835,6 +835,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (response.getCode() != 0) { dynamicTask.stop(sendRtpItem.getCallId()); redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem); + redisRpcService.removeCallback(key); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 70d53bc8..a50ddf09 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -12,10 +12,11 @@ public interface IRedisRpcService { WVPResult stopSendRtp(String sendRtpItemKey); - void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback); + long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback); void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem); void rtpSendStopped(String sendRtpItemKey); + void removeCallback(long key); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 75fc1cae..b70e4f9d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -84,7 +84,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback) { + public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback) { logger.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( @@ -122,7 +122,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } hookSubscribe.removeSubscribe(hook); }); - + return request.getSn(); } @Override @@ -147,4 +147,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService { request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } + + @Override + public void removeCallback(long key) { + redisRpcConfig.removeCallback(key); + } }