临时提交

2.7.0
648540858 2024-04-18 17:34:00 +08:00
parent 3639d2224d
commit 9adf12ca6a
3 changed files with 10 additions and 3 deletions

View File

@ -88,7 +88,7 @@ public class RedisRpcConfig implements MessageListener {
if (userSetting.getServerId().equals(request.getFromId())) { if (userSetting.getServerId().equals(request.getFromId())) {
return; return;
} }
logger.info("[redis-rpc] >> {}", request); logger.info("[redis-rpc] << {}", request);
Method method = getMethod(request.getUri()); Method method = getMethod(request.getUri());
// 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复携带目标ID但是如果是不存在的uri则直接回复404 // 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复携带目标ID但是如果是不存在的uri则直接回复404
if (userSetting.getServerId().equals(request.getToId())) { if (userSetting.getServerId().equals(request.getToId())) {
@ -130,6 +130,7 @@ public class RedisRpcConfig implements MessageListener {
} }
private void sendResponse(RedisRpcResponse response){ private void sendResponse(RedisRpcResponse response){
logger.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId()); response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage(); RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response); message.setResponse(response);
@ -137,6 +138,7 @@ public class RedisRpcConfig implements MessageListener {
} }
private void sendRequest(RedisRpcRequest request){ private void sendRequest(RedisRpcRequest request){
logger.info("[redis-rpc] >> {}", request);
RedisRpcMessage message = new RedisRpcMessage(); RedisRpcMessage message = new RedisRpcMessage();
message.setRequest(request); message.setRequest(request);
redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);

View File

@ -164,6 +164,7 @@ public class RedisRpcController {
response.setStatusCode(200); response.setStatusCode(200);
// 手动发送结果 // 手动发送结果
sendResponse(response); sendResponse(response);
hookSubscribe.removeSubscribe(hook);
}); });
return null; return null;
@ -300,6 +301,7 @@ public class RedisRpcController {
} }
private void sendResponse(RedisRpcResponse response){ private void sendResponse(RedisRpcResponse response){
logger.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId()); response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage(); RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response); message.setResponse(response);

View File

@ -89,6 +89,8 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
request.setToId(sendRtpItem.getServerId());
hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
// 读取redis中的上级点播信息生成sendRtpItm发送出去 // 读取redis中的上级点播信息生成sendRtpItm发送出去
@ -105,9 +107,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
callback.run(sendRtpItem.getRedisKey()); callback.run(sendRtpItem.getRedisKey());
} }
hookSubscribe.removeSubscribe(hook); hookSubscribe.removeSubscribe(hook);
redisRpcConfig.removeCallback(request.getSn());
}); });
RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
request.setToId(sendRtpItem.getServerId());
redisRpcConfig.request(request, response -> { redisRpcConfig.request(request, response -> {
if (response.getBody() == null) { if (response.getBody() == null) {
logger.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); logger.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
@ -118,6 +120,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
if (callback != null) { if (callback != null) {
callback.run(response.getBody().toString()); callback.run(response.getBody().toString());
} }
hookSubscribe.removeSubscribe(hook);
}); });
} }