From 304e596440b4691dd9b7ead297b5244d6b210c35 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 12 Dec 2024 17:00:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96RPC=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/redis/RedisRpcConfig.java | 13 ++++++++++--- .../redisMsg/service/RedisRpcPlayServiceImpl.java | 8 +++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index 308bb07f..02293685 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -171,19 +171,25 @@ public class RedisRpcConfig implements MessageListener { private final Map> topicSubscribers = new ConcurrentHashMap<>(); private final Map> callbacks = new ConcurrentHashMap<>(); - public RedisRpcResponse request(RedisRpcRequest request, int timeOut) { + public RedisRpcResponse request(RedisRpcRequest request, long timeOut) { + return request(request, timeOut, TimeUnit.SECONDS); + } + + public RedisRpcResponse request(RedisRpcRequest request, long timeOut, TimeUnit timeUnit) { request.setSn((long) random.nextInt(1000) + 1); SynchronousQueue subscribe = subscribe(request.getSn()); try { sendRequest(request); - return subscribe.poll(timeOut, TimeUnit.SECONDS); + return subscribe.poll(timeOut, timeUnit); } catch (InterruptedException e) { log.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e); + RedisRpcResponse redisRpcResponse = new RedisRpcResponse(); + redisRpcResponse.setStatusCode(Response.BUSY_HERE); + return redisRpcResponse; } finally { this.unsubscribe(request.getSn()); } - return null; } public void request(RedisRpcRequest request, CommonCallback callback) { @@ -236,6 +242,7 @@ public class RedisRpcConfig implements MessageListener { + // @Scheduled(fixedRate = 1000) //每1秒执行一次 // public void execute(){ // logger.info("callbacks的长度: " + callbacks.size()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index f2b1b74e..3aa85909 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; @@ -17,6 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.sip.message.Response; +import java.util.concurrent.TimeUnit; @Slf4j @Service @@ -42,13 +44,13 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { public void play(String serverId, Integer channelId, ErrorCallback callback) { RedisRpcRequest request = buildRequest("channel/play", channelId); request.setToId(serverId); - RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout()); + RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MICROSECONDS); if (response == null) { callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); }else { if (response.getStatusCode() == Response.OK) { StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); - callback.run(response.getStatusCode(), "success", streamInfo); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); }else { callback.run(response.getStatusCode(), response.getBody().toString(), null); } @@ -63,7 +65,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { jsonObject.put("inviteSessionType", type); RedisRpcRequest request = buildRequest("channel/stop", jsonObject.toJSONString()); request.setToId(serverId); - RedisRpcResponse response = redisRpcConfig.request(request, 50); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MICROSECONDS); if (response == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); }else {