From ee590d6597594f502e82e9af6bd0d303d4ac6a37 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 12 Dec 2024 16:30:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0RPC=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E5=81=9C=E6=AD=A2=E5=AE=9E=E6=97=B6=E6=B5=81=E6=92=AD=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/redis/RedisRpcConfig.java | 3 +- .../service/IGbChannelPlayService.java | 12 +++- .../iot/vmp/gb28181/service/IPlayService.java | 3 + .../impl/GbChannelPlayServiceImpl.java | 51 +++++++++++++++++ .../gb28181/service/impl/PlayServiceImpl.java | 55 ++++++++++++------- .../redisMsg/IRedisRpcPlayService.java | 3 + .../RedisRpcChannelPlayController.java | 40 ++++++++++++++ .../service/RedisRpcPlayServiceImpl.java | 21 +++++++ .../service/IStreamPushPlayService.java | 2 + .../impl/StreamPushPlayServiceImpl.java | 5 ++ 10 files changed, 173 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index 1ccaf011..308bb07f 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -17,6 +17,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import javax.sip.message.Response; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.HashMap; @@ -128,7 +129,7 @@ public class RedisRpcConfig implements MessageListener { if (method == null) { // 回复404结果 RedisRpcResponse response = request.getResponse(); - response.setStatusCode(404); + response.setStatusCode(Response.NOT_FOUND); sendResponse(response); return; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java index 4f577939..7658f635 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.service; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.InviteInfo; @@ -10,11 +11,20 @@ public interface IGbChannelPlayService { void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback callback); - void play(CommonGBChannel channel, Platform platform, ErrorCallback callback); + void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream); + + void play(CommonGBChannel channel, Platform platform, ErrorCallback callback); void playGbDeviceChannel(CommonGBChannel channel, ErrorCallback callback); + void stopPlayDeviceChannel(CommonGBChannel channel, String stream); + void playProxy(CommonGBChannel channel, ErrorCallback callback); + void stopPlayProxy(CommonGBChannel channel); + void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback callback); + + + void stopPlayPush(CommonGBChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java index 4a1403a0..b65fc470 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java @@ -68,9 +68,12 @@ public interface IPlayService { void play(CommonGBChannel channel, ErrorCallback callback); + void stop(InviteSessionType inviteSessionType, CommonGBChannel channel, String stream); + void playBack(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback callback); void download(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, ErrorCallback callback); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java index e0a736c7..618b72a1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.service.impl; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.InviteInfo; @@ -88,6 +89,24 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { } } + @Override + public void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream) { + if (channel.getGbDeviceDbId() != null) { + // 国标通道 + stopPlayDeviceChannel(channel, stream); + } else if (channel.getStreamProxyId() != null) { + // 拉流代理 + stopPlayProxy(channel); + } else if (channel.getStreamPushId() != null) { + // 推流 + stopPlayPush(channel); + } else { + // 通道数据异常 + log.error("[点播通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + } + @Override public void play(CommonGBChannel channel, Platform platform, ErrorCallback callback) { if (channel.getGbDeviceDbId() != null) { @@ -124,6 +143,18 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { } } + @Override + public void stopPlayDeviceChannel(CommonGBChannel channel, String stream) { + // 国标通道 + try { + deviceChannelPlayService.stop(InviteSessionType.PLAY, channel, stream); + } catch (Exception e) { + log.error("[停止点播失败] {}({})", channel.getGbName(), channel.getGbDeviceId(), e); + } + } + + + @Override public void playProxy(CommonGBChannel channel, ErrorCallback callback){ // 拉流代理通道 @@ -139,6 +170,16 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { } } + @Override + public void stopPlayProxy(CommonGBChannel channel) { + // 拉流代理通道 + try { + streamProxyPlayService.stop(channel.getStreamProxyId()); + }catch (Exception e) { + log.error("[停止点播失败] {}({})", channel.getGbName(), channel.getGbDeviceId(), e); + } + } + @Override public void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback callback){ // 推流 @@ -152,6 +193,16 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { } } + @Override + public void stopPlayPush(CommonGBChannel channel) { + // 推流 + try { + streamPushPlayService.stop(channel.getStreamPushId()); + }catch (Exception e) { + log.error("[停止点播失败] {}({})", channel.getGbName(), channel.getGbDeviceId(), e); + } + } + private void playbackGbDeviceChannel(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback callback){ try { deviceChannelPlayService.playBack(channel, startTime, stopTime, callback); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index f6392ba2..70bd6d81 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -1606,30 +1606,34 @@ public class PlayServiceImpl implements IPlayService { @Override public void stop(InviteSessionType type, Device device, DeviceChannel channel, String stream) { - InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream); - if (inviteInfo == null) { - if (type == InviteSessionType.PLAY) { + if (!userSetting.getServerId().equals(device.getServerId())) { + redisRpcPlayService.stop(device.getServerId(), type, channel.getId(), stream); + }else { + InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream); + if (inviteInfo == null) { + if (type == InviteSessionType.PLAY) { + deviceChannelService.stopPlay(channel.getId()); + } + return; + } + inviteStreamService.removeInviteInfo(inviteInfo); + if (InviteSessionStatus.ok == inviteInfo.getStatus()) { + try { + log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId()); + cmder.streamByeCmd(device, channel.getDeviceId(), inviteInfo.getStream(), null, null); + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + } + + if (inviteInfo.getType() == InviteSessionType.PLAY) { deviceChannelService.stopPlay(channel.getId()); } - return; - } - inviteStreamService.removeInviteInfo(inviteInfo); - if (InviteSessionStatus.ok == inviteInfo.getStatus()) { - try { - log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId()); - cmder.streamByeCmd(device, channel.getDeviceId(), inviteInfo.getStream(), null, null); - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { - log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + if (inviteInfo.getStreamInfo() != null) { + receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getSsrcInfo()); } } - - if (inviteInfo.getType() == InviteSessionType.PLAY) { - deviceChannelService.stopPlay(channel.getId()); - } - if (inviteInfo.getStreamInfo() != null) { - receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getSsrcInfo()); - } } @Override @@ -1674,6 +1678,17 @@ public class PlayServiceImpl implements IPlayService { play(device, deviceChannel, callback); } + @Override + public void stop(InviteSessionType inviteSessionType, CommonGBChannel channel, String stream) { + Device device = deviceService.getDevice(channel.getGbDeviceDbId()); + if (device == null) { + log.warn("[停止播放] 未找到通道{}的设备信息", channel); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(channel.getGbId()); + stop(InviteSessionType.PLAY, device, deviceChannel, stream); + } + @Override public void playBack(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback callback) { if (startTime == null || stopTime == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java index 1ae1268c..42957aa5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service.redisMsg; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; @@ -7,4 +8,6 @@ public interface IRedisRpcPlayService { void play(String serverId, Integer channelId, ErrorCallback callback); + + void stop(String serverId, InviteSessionType type, int channelId, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java index bcd58c3e..587f2198 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.service.redisMsg.control; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; @@ -79,4 +81,42 @@ public class RedisRpcChannelPlayController extends RpcController { return null; } + + /** + * 停止点播国标设备 + */ + @RedisRpcMapping("stop") + public RedisRpcResponse stop(RedisRpcRequest request) { + System.out.println(request.getParam().toString()); + JSONObject jsonObject = JSONObject.parseObject(request.getParam().toString()); + + RedisRpcResponse response = request.getResponse(); + + Integer channelId = jsonObject.getIntValue("channelId"); + if (channelId == null || channelId <= 0) { + response.setStatusCode(Response.BAD_REQUEST); + response.setBody("param error"); + return response; + } + + String stream = jsonObject.getString("stream"); + InviteSessionType type = jsonObject.getObject("inviteSessionType", InviteSessionType.class); + + // 获取对应的设备和通道信息 + CommonGBChannel channel = channelService.getOne(channelId); + if (channel == null) { + response.setStatusCode(Response.BAD_REQUEST); + response.setBody("param error"); + return response; + } + try { + channelPlayService.stopPlay(type, channel, stream); + response.setStatusCode(Response.OK); + }catch (Exception e){ + response.setStatusCode(Response.SERVER_INTERNAL_ERROR); + response.setBody(e.getMessage()); + } + return response; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index 0e1c0b1e..f2b1b74e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -1,8 +1,11 @@ package com.genersoft.iot.vmp.service.redisMsg.service; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; @@ -51,5 +54,23 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { } } } + + @Override + public void stop(String serverId, InviteSessionType type, int channelId, String stream) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("channelId", channelId); + jsonObject.put("stream", stream); + jsonObject.put("inviteSessionType", type); + RedisRpcRequest request = buildRequest("channel/stop", jsonObject.toJSONString()); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50); + if (response == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); + }else { + if (response.getStatusCode() != Response.OK) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java index 1881c19b..9b75ffc7 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java @@ -5,4 +5,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback; public interface IStreamPushPlayService { void start(Integer id, ErrorCallback callback, String platformDeviceId, String platformName ); + + void stop(Integer streamPushId); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java index 6fa3d65d..d56cc286 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -98,4 +98,9 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { } }); } + + @Override + public void stop(Integer streamPushId) { + // 推流无需主动停止 + } }