diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java index 435341d0..b1d3a867 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java @@ -24,4 +24,9 @@ public interface IRedisRpcPlayService { String frontEndCommand(String serverId, Integer channelId, int cmdCode, int parameter1, int parameter2, int combindCode2); + void playPush(Integer id, ErrorCallback callback); + + StreamInfo playProxy(String serverId, int id); + + void stopProxy(String serverId, int id); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 84511b69..41464574 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface IRedisRpcService { @@ -29,6 +28,4 @@ public interface IRedisRpcService { void subscribeMobilePosition(int id, int cycle, int interval); - void play(Integer id, ErrorCallback callback); - } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java new file mode 100644 index 00000000..55764c53 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java @@ -0,0 +1,95 @@ +package com.genersoft.iot.vmp.service.redisMsg.control; + +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController; +import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping; +import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; +import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; +import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RedisRpcController("streamProxy") +public class RedisRpcStreamProxyController extends RpcController { + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private IStreamProxyPlayService streamProxyPlayService; + + @Autowired + private IStreamProxyService streamProxyService; + + + private void sendResponse(RedisRpcResponse response){ + log.info("[redis-rpc] >> {}", response); + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message); + } + + /** + * 播放 + */ + @RedisRpcMapping("play") + public RedisRpcResponse play(RedisRpcRequest request) { + int id = Integer.parseInt(request.getParam().toString()); + RedisRpcResponse response = request.getResponse(); + if (id <= 0) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + StreamProxy streamProxy = streamProxyService.getStreamProxy(id); + if (streamProxy == null) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + StreamInfo streamInfo = streamProxyPlayService.startProxy(streamProxy); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(JSONObject.toJSONString(streamInfo)); + return response; + } + + /** + * 停止 + */ + @RedisRpcMapping("stop") + public RedisRpcResponse stop(RedisRpcRequest request) { + int id = Integer.parseInt(request.getParam().toString()); + RedisRpcResponse response = request.getResponse(); + if (id <= 0) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + StreamProxy streamProxy = streamProxyService.getStreamProxy(id); + if (streamProxy == null) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + streamProxyPlayService.stopProxy(streamProxy); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + return response; + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index 99466b1f..7989a4d5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -189,5 +189,42 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { } return null; } + + @Override + public void playPush(Integer id, ErrorCallback callback) { + RedisRpcRequest request = buildRequest("streamPush/play", id); + RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout()); + if (response == null) { + callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); + }else { + if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { + StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + }else { + callback.run(response.getStatusCode(), response.getBody().toString(), null); + } + } + } + + @Override + public StreamInfo playProxy(String serverId, int id) { + RedisRpcRequest request = buildRequest("streamProxy/play", id); + RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout()); + if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { + return JSON.parseObject(response.getBody().toString(), StreamInfo.class); + } + return null; + } + + @Override + public void stopProxy(String serverId, int id) { + RedisRpcRequest request = buildRequest("streamProxy/stop", id); + RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout()); + if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { + log.info("[rpc 拉流代理] 停止成功: id: {}", id); + }else { + log.info("[rpc 拉流代理] 停止失败 id: {}", id); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 12bd7656..807fd8bc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -223,20 +223,4 @@ public class RedisRpcServiceImpl implements IRedisRpcService { RedisRpcRequest request = buildRequest("device/subscribeMobilePosition", jsonObject); redisRpcConfig.request(request, 10); } - - @Override - public void play(Integer id, ErrorCallback callback) { - RedisRpcRequest request = buildRequest("streamPush/play", id); - RedisRpcResponse response = redisRpcConfig.request(request, 10); - if (response == null) { - callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); - }else { - if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { - StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - }else { - callback.run(response.getStatusCode(), response.getBody().toString(), null); - } - } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java index b93e4dc3..8651eea2 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; @@ -31,6 +32,9 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { @Autowired private UserSetting userSetting; + @Autowired + private IRedisRpcPlayService redisRpcPlayService; + @Override public StreamInfo start(int id) { StreamProxy streamProxy = streamProxyMapper.select(id); @@ -47,7 +51,7 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { return null; } if (!userSetting.getServerId().equals(streamProxy.getServerId())) { - return redisRpcService.play(id, callback); + return redisRpcPlayService.playProxy(streamProxy.getServerId(), streamProxy.getId()); } MediaServer mediaServer; @@ -74,6 +78,9 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { if (streamProxy == null) { throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } + if (!userSetting.getServerId().equals(streamProxy.getServerId())) { + redisRpcPlayService.stopProxy(streamProxy.getServerId(), streamProxy.getId()); + } stopProxy(streamProxy); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java index d1fdb82a..16fdc4cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -45,6 +46,9 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { @Autowired private IRedisRpcService redisRpcService; + @Autowired + private IRedisRpcPlayService redisRpcPlayService; + @Autowired private RedisPushStreamResponseListener redisPushStreamResponseListener; @@ -54,7 +58,7 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { Assert.notNull(streamPush, "推流信息未找到"); if (!userSetting.getServerId().equals(streamPush.getServerId())) { - redisRpcService.play(id, callback); + redisRpcPlayService.playPush(id, callback); return; }