临时提交
parent
0d0b31f715
commit
87629b7fc7
|
@ -771,7 +771,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
|
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
|
||||||
// 设置超时
|
// 设置超时
|
||||||
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
|
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
|
||||||
redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem);
|
redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
|
||||||
logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
|
logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
|
||||||
try {
|
try {
|
||||||
responseAck(request, Response.REQUEST_TIMEOUT); // 超时
|
responseAck(request, Response.REQUEST_TIMEOUT); // 超时
|
||||||
|
@ -834,7 +834,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
|
redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
|
||||||
if (response.getCode() != 0) {
|
if (response.getCode() != 0) {
|
||||||
dynamicTask.stop(sendRtpItem.getCallId());
|
dynamicTask.stop(sendRtpItem.getCallId());
|
||||||
redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem);
|
redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
|
||||||
redisRpcService.removeCallback(key);
|
redisRpcService.removeCallback(key);
|
||||||
try {
|
try {
|
||||||
responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
|
responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
|
||||||
|
|
|
@ -14,7 +14,7 @@ public interface IRedisRpcService {
|
||||||
|
|
||||||
long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
|
long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
|
||||||
|
|
||||||
void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem);
|
void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
|
||||||
|
|
||||||
void rtpSendStopped(String sendRtpItemKey);
|
void rtpSendStopped(String sendRtpItemKey);
|
||||||
|
|
||||||
|
|
|
@ -174,21 +174,15 @@ public class RedisRpcController {
|
||||||
* 停止监听流上线
|
* 停止监听流上线
|
||||||
*/
|
*/
|
||||||
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
|
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
|
||||||
String sendRtpItemKey = request.getParam().toString();
|
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
|
||||||
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
|
|
||||||
if (sendRtpItem == null) {
|
|
||||||
logger.info("[redis-rpc] 停止监听流上线, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
|
||||||
RedisRpcResponse response = request.getResponse();
|
|
||||||
response.setStatusCode(200);
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
|
||||||
|
|
||||||
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
||||||
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
|
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
|
||||||
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
|
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
|
||||||
hookSubscribe.removeSubscribe(hook);
|
hookSubscribe.removeSubscribe(hook);
|
||||||
return null;
|
logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||||
|
RedisRpcResponse response = request.getResponse();
|
||||||
|
response.setStatusCode(200);
|
||||||
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -14,19 +14,14 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
|
||||||
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
||||||
import com.genersoft.iot.vmp.utils.SystemInfoUtils;
|
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class RedisRpcServiceImpl implements IRedisRpcService {
|
public class RedisRpcServiceImpl implements IRedisRpcService {
|
||||||
|
|
||||||
|
@ -131,12 +126,12 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem) {
|
public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
|
||||||
logger.info("[停止WVP监听流上线] {}/{}, key:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItemKey);
|
logger.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
|
||||||
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
|
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
|
||||||
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
|
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
|
||||||
hookSubscribe.removeSubscribe(hook);
|
hookSubscribe.removeSubscribe(hook);
|
||||||
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItemKey);
|
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
|
||||||
request.setToId(sendRtpItem.getServerId());
|
request.setToId(sendRtpItem.getServerId());
|
||||||
redisRpcConfig.request(request, 10);
|
redisRpcConfig.request(request, 10);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue