From 8ef71b0f2d6dd5e9c7d2bb39b9766ec177a90898 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 11 Dec 2024 18:26:18 +0800 Subject: [PATCH] =?UTF-8?q?[redis=E4=B8=8B=E7=9A=84=E5=A4=9Awvp]=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1=E9=97=B4=E6=96=B9=E6=B3=95?= =?UTF-8?q?=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/redis/RedisRpcConfig.java | 61 ++++-- .../conf/redis/bean/RedisRpcClassHandler.java | 17 ++ .../gb28181/controller/PlayController.java | 10 +- .../iot/vmp/media/MediaServerConfig.java | 5 + .../iot/vmp/media/bean/MediaServer.java | 11 + .../service/impl/MediaServerServiceImpl.java | 22 +- .../media/zlm/ZLMMediaNodeServerService.java | 1 + .../redisMsg/IRedisRpcPlayService.java} | 6 +- .../service/redisMsg/IRedisRpcService.java | 1 + .../control/RedisRpcSendRtpController.java | 177 +++++++++++++++++ ...java => RedisRpcStreamPushController.java} | 188 ++---------------- .../redisMsg/dto/RedisRpcController.java | 16 ++ .../service/redisMsg/dto/RedisRpcMapping.java | 13 ++ .../service/RedisRpcPlayServiceImpl.java} | 11 +- .../redisMsg/service/RedisRpcServiceImpl.java | 16 +- .../vmp/storager/dao/MediaServerMapper.java | 34 ++-- 数据库/2.7.3/初始化-mysql-2.7.3.sql | 2 + .../2.7.3/初始化-postgresql-kingbase-2.7.3.sql | 2 + 18 files changed, 354 insertions(+), 239 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcClassHandler.java rename src/main/java/com/genersoft/iot/vmp/{gb28181/service/IGbChannelRpcPlayService.java => service/redisMsg/IRedisRpcPlayService.java} (68%) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java rename src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/{RedisRpcController.java => RedisRpcStreamPushController.java} (51%) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/redisMsg/dto/RedisRpcController.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/redisMsg/dto/RedisRpcMapping.java rename src/main/java/com/genersoft/iot/vmp/{gb28181/service/impl/PlayRpcService.java => service/redisMsg/service/RedisRpcPlayServiceImpl.java} (86%) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index b762838c..6607acec 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -3,10 +3,13 @@ package com.genersoft.iot.vmp.conf.redis; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcClassHandler; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; -import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; +import com.genersoft.iot.vmp.jt1078.util.ClassUtil; +import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController; +import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -16,8 +19,11 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; @@ -36,9 +42,6 @@ public class RedisRpcConfig implements MessageListener { @Autowired private UserSetting userSetting; - @Autowired - private RedisRpcController redisRpcController; - @Autowired private RedisTemplate redisTemplate; @@ -48,6 +51,37 @@ public class RedisRpcConfig implements MessageListener { @Autowired private ThreadPoolTaskExecutor taskExecutor; + private final static Map protocolHash = new HashMap<>(); + + // 启动时执行 + @PostConstruct + public void init(){ + List> classList = ClassUtil.getClassList("com.genersoft.iot.vmp.service.redisMsg.control", RedisRpcController.class); + for (Class handlerClass : classList) { + String controllerPath = handlerClass.getAnnotation(RedisRpcController.class).value(); + // 扫描其下的方法 + Method[] methods = handlerClass.getDeclaredMethods(); + for (Method method : methods) { + RedisRpcMapping annotation = method.getAnnotation(RedisRpcMapping.class); + if (annotation != null) { + String methodPath = annotation.value(); + if (methodPath != null) { + protocolHash.put(controllerPath + "/" + methodPath, new RedisRpcClassHandler(handlerClass, method)); + } + } + + } + + } + for (String s : protocolHash.keySet()) { + System.out.println(s); + } + if (log.isDebugEnabled()) { + log.debug("消息ID缓存表 protocolHash:{}", protocolHash); + } + } + + @Override public void onMessage(Message message, byte[] pattern) { boolean isEmpty = taskQueue.isEmpty(); @@ -87,7 +121,9 @@ public class RedisRpcConfig implements MessageListener { return; } log.info("[redis-rpc] << {}", request); - Method method = getMethod(request.getUri()); + RedisRpcClassHandler redisRpcClassHandler = protocolHash.get(request.getUri()); + Class objectClass = redisRpcClassHandler.getObjectClass(); + Method method = redisRpcClassHandler.getMethod(); // 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404 if (userSetting.getServerId().equals(request.getToId())) { if (method == null) { @@ -97,7 +133,7 @@ public class RedisRpcConfig implements MessageListener { sendResponse(response); return; } - RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + RedisRpcResponse response = (RedisRpcResponse)method.invoke(objectClass, request); if(response != null) { sendResponse(response); } @@ -105,7 +141,7 @@ public class RedisRpcConfig implements MessageListener { if (method == null) { return; } - RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + RedisRpcResponse response = (RedisRpcResponse)method.invoke(objectClass, request); if (response != null) { sendResponse(response); } @@ -116,17 +152,6 @@ public class RedisRpcConfig implements MessageListener { } - private Method getMethod(String name) { - // 启动后扫描所有的路径注解 - Method[] methods = redisRpcController.getClass().getMethods(); - for (Method method : methods) { - if (method.getName().equals(name)) { - return method; - } - } - return null; - } - private void sendResponse(RedisRpcResponse response){ log.info("[redis-rpc] >> {}", response); response.setToId(userSetting.getServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcClassHandler.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcClassHandler.java new file mode 100644 index 00000000..8c49a79f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcClassHandler.java @@ -0,0 +1,17 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +import lombok.Data; + +import java.lang.reflect.Method; + +@Data +public class RedisRpcClassHandler { + + private Class objectClass; + private Method method; + + public RedisRpcClassHandler(Class objectClass, Method method) { + this.objectClass = objectClass; + this.method = method; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java index d9f7e731..e5087cba 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java @@ -10,7 +10,10 @@ import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; -import com.genersoft.iot.vmp.gb28181.service.*; +import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; +import com.genersoft.iot.vmp.gb28181.service.IDeviceService; +import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; +import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -18,6 +21,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -63,7 +67,7 @@ public class PlayController { private IPlayService playService; @Autowired - private IGbChannelRpcPlayService playRpcService; + private IRedisRpcPlayService redisRpcPlayService; @Autowired private IMediaServerService mediaServerService; @@ -151,7 +155,7 @@ public class PlayController { }; // 判断设备是否属于当前平台, 如果不属于则发起自动调用 if (userSetting.getServerId().equals(device.getServerId())) { - playRpcService.play(device.getServerId(), channel.getId(), callback); + redisRpcPlayService.play(device.getServerId(), channel.getId(), callback); }else { playService.play(device, channel, callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java index b9f7c08e..ba27fb35 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.media; import com.genersoft.iot.vmp.conf.MediaConfig; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; @@ -30,6 +31,9 @@ public class MediaServerConfig implements CommandLineRunner { @Autowired private MediaConfig mediaConfig; + @Autowired + private UserSetting userSetting; + @Override public void run(String... strings) throws Exception { @@ -37,6 +41,7 @@ public class MediaServerConfig implements CommandLineRunner { mediaServerService.clearMediaServerForOnline(); MediaServer defaultMediaServer = mediaServerService.getDefaultMediaServer(); MediaServer mediaSerItemInConfig = mediaConfig.getMediaSerItem(); + mediaSerItemInConfig.setServerId(userSetting.getServerId()); if (defaultMediaServer != null && mediaSerItemInConfig.getId().equals(defaultMediaServer.getId())) { mediaServerService.update(mediaSerItemInConfig); }else { diff --git a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaServer.java b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaServer.java index e6faac36..2660cb6a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaServer.java +++ b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaServer.java @@ -103,6 +103,9 @@ public class MediaServer { @Schema(description = "转码的前缀") private String transcodeSuffix; + @Schema(description = "服务Id") + private String serverId; + public MediaServer() { } @@ -388,4 +391,12 @@ public class MediaServer { public void setTranscodeSuffix(String transcodeSuffix) { this.transcodeSuffix = transcodeSuffix; } + + public String getServerId() { + return serverId; + } + + public void setServerId(String serverId) { + this.serverId = serverId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index f755c5f5..3c38035e 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -303,7 +303,7 @@ public class MediaServerServiceImpl implements IMediaServerService { mediaServerMapper.update(mediaSerItem); MediaServer mediaServerInRedis = getOne(mediaSerItem.getId()); // 获取完整数据 - MediaServer mediaServerInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId()); + MediaServer mediaServerInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId(), userSetting.getServerId()); if (mediaServerInDataBase == null) { return; } @@ -350,7 +350,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public List getAll() { - List mediaServerList = mediaServerMapper.queryAll(); + List mediaServerList = mediaServerMapper.queryAll(userSetting.getServerId()); if (mediaServerList.isEmpty()) { return new ArrayList<>(); } @@ -366,7 +366,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public List getAllFromDatabase() { - return mediaServerMapper.queryAll(); + return mediaServerMapper.queryAll(userSetting.getServerId()); } @Override @@ -403,7 +403,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public MediaServer getDefaultMediaServer() { - return mediaServerMapper.queryDefault(); + return mediaServerMapper.queryDefault(userSetting.getServerId()); } @Override @@ -423,7 +423,7 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[添加媒体节点] 失败, mediaServer的类型:为空"); return; } - if (mediaServerMapper.queryOne(mediaServer.getId()) != null) { + if (mediaServerMapper.queryOne(mediaServer.getId(), userSetting.getServerId()) != null) { log.info("[添加媒体节点] 失败, 媒体服务ID已存在,请修改媒体服务器配置, {}", mediaServer.getId()); throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败,媒体服务ID [ " + mediaServer.getId() + " ] 已存在,请修改媒体服务器配置"); } @@ -521,7 +521,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public MediaServer checkMediaServer(String ip, int port, String secret, String type) { - if (mediaServerMapper.queryOneByHostAndPort(ip, port) != null) { + if (mediaServerMapper.queryOneByHostAndPort(ip, port, userSetting.getServerId()) != null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "此连接已存在"); } @@ -532,7 +532,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } MediaServer mediaServer = mediaNodeServerService.checkMediaServer(ip, port, secret); if (mediaServer != null) { - if (mediaServerMapper.queryOne(mediaServer.getId()) != null) { + if (mediaServerMapper.queryOne(mediaServer.getId(), userSetting.getServerId()) != null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体服务ID [" + mediaServer.getId() + " ] 已存在,请修改媒体服务器配置"); } } @@ -560,7 +560,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public void delete(MediaServer mediaServer) { - mediaServerMapper.delOne(mediaServer.getId()); + mediaServerMapper.delOne(mediaServer.getId(), userSetting.getServerId()); redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), mediaServer.getId()); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServer.getId(); redisTemplate.delete(key); @@ -572,13 +572,13 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public MediaServer getOneFromDatabase(String mediaServerId) { - return mediaServerMapper.queryOne(mediaServerId); + return mediaServerMapper.queryOne(mediaServerId, userSetting.getServerId()); } @Override public void syncCatchFromDatabase() { List allInCatch = getAllOnlineList(); - List allInDatabase = mediaServerMapper.queryAll(); + List allInDatabase = mediaServerMapper.queryAll(userSetting.getServerId()); Map mediaServerMap = new HashMap<>(); for (MediaServer mediaServer : allInDatabase) { @@ -606,7 +606,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public List getAllWithAssistPort() { - return mediaServerMapper.queryAllWithAssistPort(); + return mediaServerMapper.queryAllWithAssistPort(userSetting.getServerId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 802b7eb6..da44d23d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -89,6 +89,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { @Override public MediaServer checkMediaServer(String ip, int port, String secret) { MediaServer mediaServer = new MediaServer(); + mediaServer.setServerId(userSetting.getServerId()); mediaServer.setIp(ip); mediaServer.setHttpPort(port); mediaServer.setFlvPort(port); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java similarity index 68% rename from src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java index 1847e430..1ae1268c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java @@ -1,8 +1,10 @@ -package com.genersoft.iot.vmp.gb28181.service; +package com.genersoft.iot.vmp.service.redisMsg; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; -public interface IGbChannelRpcPlayService { +public interface IRedisRpcPlayService { + + void play(String serverId, Integer channelId, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 7e23e276..6dcfb5fb 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -23,4 +23,5 @@ public interface IRedisRpcService { long onStreamOnlineEvent(String app, String stream, CommonCallback callback); void unPushStreamOnlineEvent(String app, String stream); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java new file mode 100644 index 00000000..1077f578 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java @@ -0,0 +1,177 @@ +package com.genersoft.iot.vmp.service.redisMsg.control; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController; +import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; + +import javax.sip.message.Response; + +@Slf4j +@RedisRpcController("sendRtp") +public class RedisRpcSendRtpController { + + @Autowired + private SSRCFactory ssrcFactory; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private ISendRtpServerService sendRtpServerService; + + @Autowired + private UserSetting userSetting; + + @Autowired + private HookSubscribe hookSubscribe; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private IGbChannelService channelService; + + @Autowired + private IGbChannelPlayService channelPlayService; + + /** + * 获取发流的信息 + */ + @RedisRpcMapping("getSendRtpItem") + public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { + String callId = request.getParam().toString(); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); + if (sendRtpItem == null) { + log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, callId:{}", callId); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + return response; + } + log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); + // 查询本级是否有这个流 + MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaServerItem == null) { + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + // 自平台内容 + int localPort = sendRtpServerService.getNextPort(mediaServerItem); + if (localPort == 0) { + log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + sendRtpItem.setServerId(userSetting.getServerId()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + if (sendRtpItem.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpServerService.update(sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + response.setBody(callId); + return response; + } + + /** + * 开始发流 + */ + @RedisRpcMapping("startSendRtp") + public RedisRpcResponse startSendRtp(RedisRpcRequest request) { + String callId = request.getParam().toString(); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (sendRtpItem == null) { + log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, callId:{}", callId); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); + response.setBody(wvpResult); + return response; + } + log.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServer == null) { + log.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); + response.setBody(wvpResult); + return response; + } + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaInfo == null) { + log.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线"); + response.setBody(wvpResult); + return response; + } + try { + mediaServerService.startSendRtp(mediaServer, sendRtpItem); + }catch (ControllerException exception) { + log.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg()); + WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg()); + response.setBody(wvpResult); + return response; + } + log.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); + WVPResult wvpResult = WVPResult.success(); + response.setBody(wvpResult); + return response; + } + + /** + * 停止发流 + */ + @RedisRpcMapping("stopSendRtp") + public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { + String callId = request.getParam().toString(); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(Response.OK); + if (sendRtpItem == null) { + log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", callId); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); + response.setBody(wvpResult); + return response; + } + log.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServer == null) { + log.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); + WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); + response.setBody(wvpResult); + return response; + } + try { + mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); + }catch (ControllerException exception) { + log.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(), + sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() ); + response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg())); + return response; + } + log.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); + response.setBody(WVPResult.success()); + return response; + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java similarity index 51% rename from src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java index 946178bd..3465bdc8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java @@ -3,39 +3,29 @@ package com.genersoft.iot.vmp.service.redisMsg.control; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; -import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; -import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController; +import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; -import javax.sip.message.Response; - -/** - * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用 - */ @Slf4j -@Component -public class RedisRpcController { +@RedisRpcController("streamPush") +public class RedisRpcStreamPushController { @Autowired private SSRCFactory ssrcFactory; @@ -61,52 +51,18 @@ public class RedisRpcController { @Autowired private IGbChannelPlayService channelPlayService; - - /** - * 获取发流的信息 - */ - public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { - String callId = request.getParam().toString(); - SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); - if (sendRtpItem == null) { - log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, callId:{}", callId); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); - return response; - } - log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); - // 查询本级是否有这个流 - MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); - if (mediaServerItem == null) { - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); - } - // 自平台内容 - int localPort = sendRtpServerService.getNextPort(mediaServerItem); - if (localPort == 0) { - log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" ); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); - } - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - sendRtpItem.setServerId(userSetting.getServerId()); - sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); - if (sendRtpItem.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - sendRtpItem.setSsrc(ssrc); - } - sendRtpServerService.update(sendRtpItem); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); - response.setBody(callId); - return response; + private void sendResponse(RedisRpcResponse response){ + log.info("[redis-rpc] >> {}", response); + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message); } /** * 监听流上线 */ + @RedisRpcMapping("waitePushStreamOnline") public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) { SendRtpInfo sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpInfo.class); log.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); @@ -158,6 +114,7 @@ public class RedisRpcController { /** * 监听流上线 */ + @RedisRpcMapping("onStreamOnlineEvent") public RedisRpcResponse onStreamOnlineEvent(RedisRpcRequest request) { StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class); log.info("[redis-rpc] 监听流信息,等待流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream()); @@ -191,6 +148,7 @@ public class RedisRpcController { /** * 停止监听流上线 */ + @RedisRpcMapping("stopWaitePushStreamOnline") public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) { SendRtpInfo sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpInfo.class); log.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); @@ -205,6 +163,7 @@ public class RedisRpcController { /** * 停止监听流上线 */ + @RedisRpcMapping("unPushStreamOnlineEvent") public RedisRpcResponse unPushStreamOnlineEvent(RedisRpcRequest request) { StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class); log.info("[redis-rpc] 停止监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream()); @@ -216,123 +175,4 @@ public class RedisRpcController { return response; } - - /** - * 开始发流 - */ - public RedisRpcResponse startSendRtp(RedisRpcRequest request) { - String callId = request.getParam().toString(); - SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); - if (sendRtpItem == null) { - log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, callId:{}", callId); - WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); - response.setBody(wvpResult); - return response; - } - log.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); - MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaServer == null) { - log.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); - WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); - response.setBody(wvpResult); - return response; - } - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); - if (mediaInfo == null) { - log.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); - WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线"); - response.setBody(wvpResult); - return response; - } - try { - mediaServerService.startSendRtp(mediaServer, sendRtpItem); - }catch (ControllerException exception) { - log.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg()); - WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg()); - response.setBody(wvpResult); - return response; - } - log.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); - WVPResult wvpResult = WVPResult.success(); - response.setBody(wvpResult); - return response; - } - - /** - * 停止发流 - */ - public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { - String callId = request.getParam().toString(); - SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); - RedisRpcResponse response = request.getResponse(); - response.setStatusCode(Response.OK); - if (sendRtpItem == null) { - log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", callId); - WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); - response.setBody(wvpResult); - return response; - } - log.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); - MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaServer == null) { - log.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); - WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); - response.setBody(wvpResult); - return response; - } - try { - mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); - }catch (ControllerException exception) { - log.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(), - sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() ); - response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg())); - return response; - } - log.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); - response.setBody(WVPResult.success()); - return response; - } - - private void sendResponse(RedisRpcResponse response){ - log.info("[redis-rpc] >> {}", response); - response.setToId(userSetting.getServerId()); - RedisRpcMessage message = new RedisRpcMessage(); - message.setResponse(response); - redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message); - } - - /** - * 点播国标设备 - */ - public RedisRpcResponse playChannel(RedisRpcRequest request) { - int channelId = Integer.parseInt(request.getParam().toString()); - RedisRpcResponse response = request.getResponse(); - - if (channelId <= 0) { - response.setStatusCode(Response.BAD_REQUEST); - response.setBody("param error"); - return response; - } - // 获取对应的设备和通道信息 - CommonGBChannel channel = channelService.getOne(channelId); - if (channel == null) { - response.setStatusCode(Response.BAD_REQUEST); - response.setBody("param error"); - return response; - } - - channelPlayService.play(channel, null, (code, msg, data) ->{ - if (code == InviteErrorCode.SUCCESS.getCode()) { - response.setStatusCode(Response.OK); - response.setBody(data); - }else { - response.setStatusCode(code); - } - // 手动发送结果 - sendResponse(response); - }); - return null; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/dto/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/dto/RedisRpcController.java new file mode 100644 index 00000000..79cdcb37 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/dto/RedisRpcController.java @@ -0,0 +1,16 @@ +package com.genersoft.iot.vmp.service.redisMsg.dto; + +import org.springframework.stereotype.Component; + +import java.lang.annotation.*; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Component +public @interface RedisRpcController { + /** + * 请求路径 + */ + String value() default ""; +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/dto/RedisRpcMapping.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/dto/RedisRpcMapping.java new file mode 100644 index 00000000..61f51bb6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/dto/RedisRpcMapping.java @@ -0,0 +1,13 @@ +package com.genersoft.iot.vmp.service.redisMsg.dto; + +import java.lang.annotation.*; + +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface RedisRpcMapping { + /** + * 请求路径 + */ + String value() default ""; +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java similarity index 86% rename from src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index 96eb3ced..93f218a6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.gb28181.service.impl; +package com.genersoft.iot.vmp.service.redisMsg.service; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.StreamInfo; @@ -6,8 +6,8 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; -import com.genersoft.iot.vmp.gb28181.service.IGbChannelRpcPlayService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -16,8 +16,9 @@ import org.springframework.stereotype.Service; import javax.sip.message.Response; @Slf4j -@Service("playRpcService") -public class PlayRpcService implements IGbChannelRpcPlayService { +@Service +public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { + @Autowired private RedisRpcConfig redisRpcConfig; @@ -36,7 +37,6 @@ public class PlayRpcService implements IGbChannelRpcPlayService { @Override public void play(String serverId, Integer channelId, ErrorCallback callback) { - log.info("[点播其他WVP的设备] 通道Id:{}", channelId); RedisRpcRequest request = buildRequest("playChannel", channelId); request.setToId(serverId); RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout()); @@ -52,3 +52,4 @@ public class PlayRpcService implements IGbChannelRpcPlayService { } } } + diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 86182135..939dcb05 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -58,7 +58,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { @Override public SendRtpInfo getSendRtpItem(String callId) { - RedisRpcRequest request = buildRequest("getSendRtpItem", callId); + RedisRpcRequest request = buildRequest("sendRtp/getSendRtpItem", callId); RedisRpcResponse response = redisRpcConfig.request(request, 10); if (response.getBody() == null) { return null; @@ -69,7 +69,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { @Override public WVPResult startSendRtp(String callId, SendRtpInfo sendRtpItem) { log.info("[请求其他WVP] 开始推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); - RedisRpcRequest request = buildRequest("startSendRtp", callId); + RedisRpcRequest request = buildRequest("sendRtp/startSendRtp", callId); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); @@ -83,7 +83,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息"); } log.info("[请求其他WVP] 停止推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); - RedisRpcRequest request = buildRequest("stopSendRtp", callId); + RedisRpcRequest request = buildRequest("sendRtp/stopSendRtp", callId); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); @@ -94,7 +94,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { log.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null); - RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem); + RedisRpcRequest request = buildRequest("streamPush/waitePushStreamOnline", sendRtpItem); request.setToId(sendRtpItem.getServerId()); hookSubscribe.addSubscribe(hook, (hookData) -> { @@ -135,7 +135,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { log.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null); hookSubscribe.removeSubscribe(hook); - RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem); + RedisRpcRequest request = buildRequest("streamPush/stopWaitePushStreamOnline", sendRtpItem); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } @@ -147,7 +147,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { log.info("[停止WVP监听流上线] 未找到redis中的发流信息, key:{}", callId); return; } - RedisRpcRequest request = buildRequest("rtpSendStopped", callId); + RedisRpcRequest request = buildRequest("streamPush/rtpSendStopped", callId); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } @@ -166,7 +166,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { StreamInfo streamInfoParam = new StreamInfo(); streamInfoParam.setApp(app); streamInfoParam.setStream(stream); - RedisRpcRequest request = buildRequest("onStreamOnlineEvent", streamInfoParam); + RedisRpcRequest request = buildRequest("streamPush/onStreamOnlineEvent", streamInfoParam); hookSubscribe.addSubscribe(hook, (hookData) -> { log.info("[请求所有WVP监听流上线] 监听流上线 {}/{}", app, stream); if (callback != null) { @@ -198,7 +198,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { StreamInfo streamInfoParam = new StreamInfo(); streamInfoParam.setApp(app); streamInfoParam.setStream(stream); - RedisRpcRequest request = buildRequest("unPushStreamOnlineEvent", streamInfoParam); + RedisRpcRequest request = buildRequest("streamPush/unPushStreamOnlineEvent", streamInfoParam); redisRpcConfig.request(request, 10); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java index 2f2bfb6d..8682461f 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java @@ -42,6 +42,7 @@ public interface MediaServerMapper { "create_time,"+ "update_time,"+ "transcode_suffix,"+ + "server_id,"+ "hook_alive_interval"+ ") VALUES " + "(" + @@ -74,6 +75,7 @@ public interface MediaServerMapper { "#{createTime}, " + "#{updateTime}, " + "#{transcodeSuffix}, " + + "#{serverId}, " + "#{hookAliveInterval})") int add(MediaServer mediaServerItem); @@ -105,6 +107,7 @@ public interface MediaServerMapper { ", record_day=#{recordDay}" + ", record_path=#{recordPath}" + ", transcode_suffix=#{transcodeSuffix}" + + ", server_id=#{serverId}" + ", type=#{type}" + "WHERE id=#{id}"+ " "}) @@ -138,32 +141,27 @@ public interface MediaServerMapper { ", type=#{type}" + ", transcode_suffix=#{transcodeSuffix}" + ", hook_alive_interval=#{hookAliveInterval}" + + ", server_id=#{serverId}" + "WHERE ip=#{ip} and http_port=#{httpPort}"+ " "}) int updateByHostAndPort(MediaServer mediaServerItem); - @Select("SELECT * FROM wvp_media_server WHERE id=#{id}") - MediaServer queryOne(String id); + @Select("SELECT * FROM wvp_media_server WHERE id=#{id} and server_id = #{serverId}") + MediaServer queryOne(@Param("id") String id, @Param("serverId") String serverId); - @Select("SELECT * FROM wvp_media_server") - List queryAll(); + @Select("SELECT * FROM wvp_media_server where server_id = #{serverId}") + List queryAll(@Param("serverId") String serverId); - @Delete("DELETE FROM wvp_media_server WHERE id=#{id}") - void delOne(String id); + @Delete("DELETE FROM wvp_media_server WHERE id=#{id} and server_id = #{serverId}") + void delOne(String id, @Param("serverId") String serverId); - @Select("DELETE FROM wvp_media_server WHERE ip=#{host} and http_port=#{port}") - void delOneByIPAndPort(@Param("host") String host, @Param("port") int port); + @Select("SELECT * FROM wvp_media_server WHERE ip=#{host} and http_port=#{port} and server_id = #{serverId}") + MediaServer queryOneByHostAndPort(@Param("host") String host, @Param("port") int port, @Param("serverId") String serverId); - @Delete("DELETE FROM wvp_media_server WHERE default_server=true") - int delDefault(); + @Select("SELECT * FROM wvp_media_server WHERE default_server=true and server_id = #{serverId}") + MediaServer queryDefault(@Param("serverId") String serverId); - @Select("SELECT * FROM wvp_media_server WHERE ip=#{host} and http_port=#{port}") - MediaServer queryOneByHostAndPort(@Param("host") String host, @Param("port") int port); - - @Select("SELECT * FROM wvp_media_server WHERE default_server=true") - MediaServer queryDefault(); - - @Select("SELECT * FROM wvp_media_server WHERE record_assist_port > 0") - List queryAllWithAssistPort(); + @Select("SELECT * FROM wvp_media_server WHERE record_assist_port > 0 and server_id = #{serverId}") + List queryAllWithAssistPort(@Param("serverId") String serverId); } diff --git a/数据库/2.7.3/初始化-mysql-2.7.3.sql b/数据库/2.7.3/初始化-mysql-2.7.3.sql index e12677da..5ffb722c 100644 --- a/数据库/2.7.3/初始化-mysql-2.7.3.sql +++ b/数据库/2.7.3/初始化-mysql-2.7.3.sql @@ -29,6 +29,7 @@ create table wvp_device custom_name character varying(255), sdp_ip character varying(50), local_ip character varying(50), + server_id character varying(50), password character varying(255), as_message_channel bool default false, keepalive_interval_time integer, @@ -190,6 +191,7 @@ create table wvp_media_server record_path character varying(255), record_day integer default 7, transcode_suffix character varying(255), + server_id character varying(50), constraint uk_media_server_unique_ip_http_port unique (ip, http_port) ); diff --git a/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql b/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql index c632f9a8..9789b463 100644 --- a/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql +++ b/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql @@ -29,6 +29,7 @@ create table wvp_device custom_name character varying(255), sdp_ip character varying(50), local_ip character varying(50), + server_id character varying(50), password character varying(255), as_message_channel bool default false, keepalive_interval_time integer, @@ -207,6 +208,7 @@ create table wvp_media_server record_path character varying(255), record_day integer default 7, transcode_suffix character varying(255), + server_id character varying(50), constraint uk_media_server_unique_ip_http_port unique (ip, http_port) );