[redis下的多wvp] 优化服务间方法调用

dev/数据库统合
648540858 2024-12-11 18:26:18 +08:00
parent 3afdfff5b2
commit 8ef71b0f2d
18 changed files with 354 additions and 239 deletions

View File

@ -3,10 +3,13 @@ package com.genersoft.iot.vmp.conf.redis;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcClassHandler;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
import com.genersoft.iot.vmp.jt1078.util.ClassUtil;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -16,8 +19,11 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
@ -36,9 +42,6 @@ public class RedisRpcConfig implements MessageListener {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisRpcController redisRpcController;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@ -48,6 +51,37 @@ public class RedisRpcConfig implements MessageListener {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private final static Map<String, RedisRpcClassHandler> protocolHash = new HashMap<>();
// 启动时执行
@PostConstruct
public void init(){
List<Class<?>> classList = ClassUtil.getClassList("com.genersoft.iot.vmp.service.redisMsg.control", RedisRpcController.class);
for (Class<?> handlerClass : classList) {
String controllerPath = handlerClass.getAnnotation(RedisRpcController.class).value();
// 扫描其下的方法
Method[] methods = handlerClass.getDeclaredMethods();
for (Method method : methods) {
RedisRpcMapping annotation = method.getAnnotation(RedisRpcMapping.class);
if (annotation != null) {
String methodPath = annotation.value();
if (methodPath != null) {
protocolHash.put(controllerPath + "/" + methodPath, new RedisRpcClassHandler(handlerClass, method));
}
}
}
}
for (String s : protocolHash.keySet()) {
System.out.println(s);
}
if (log.isDebugEnabled()) {
log.debug("消息ID缓存表 protocolHash:{}", protocolHash);
}
}
@Override
public void onMessage(Message message, byte[] pattern) {
boolean isEmpty = taskQueue.isEmpty();
@ -87,7 +121,9 @@ public class RedisRpcConfig implements MessageListener {
return;
}
log.info("[redis-rpc] << {}", request);
Method method = getMethod(request.getUri());
RedisRpcClassHandler redisRpcClassHandler = protocolHash.get(request.getUri());
Class<?> objectClass = redisRpcClassHandler.getObjectClass();
Method method = redisRpcClassHandler.getMethod();
// 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复携带目标ID但是如果是不存在的uri则直接回复404
if (userSetting.getServerId().equals(request.getToId())) {
if (method == null) {
@ -97,7 +133,7 @@ public class RedisRpcConfig implements MessageListener {
sendResponse(response);
return;
}
RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
RedisRpcResponse response = (RedisRpcResponse)method.invoke(objectClass, request);
if(response != null) {
sendResponse(response);
}
@ -105,7 +141,7 @@ public class RedisRpcConfig implements MessageListener {
if (method == null) {
return;
}
RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
RedisRpcResponse response = (RedisRpcResponse)method.invoke(objectClass, request);
if (response != null) {
sendResponse(response);
}
@ -116,17 +152,6 @@ public class RedisRpcConfig implements MessageListener {
}
private Method getMethod(String name) {
// 启动后扫描所有的路径注解
Method[] methods = redisRpcController.getClass().getMethods();
for (Method method : methods) {
if (method.getName().equals(name)) {
return method;
}
}
return null;
}
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());

View File

@ -0,0 +1,17 @@
package com.genersoft.iot.vmp.conf.redis.bean;
import lombok.Data;
import java.lang.reflect.Method;
@Data
public class RedisRpcClassHandler {
private Class<?> objectClass;
private Method method;
public RedisRpcClassHandler(Class<?> objectClass, Method method) {
this.objectClass = objectClass;
this.method = method;
}
}

View File

@ -10,7 +10,10 @@ import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
@ -18,6 +21,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -63,7 +67,7 @@ public class PlayController {
private IPlayService playService;
@Autowired
private IGbChannelRpcPlayService playRpcService;
private IRedisRpcPlayService redisRpcPlayService;
@Autowired
private IMediaServerService mediaServerService;
@ -151,7 +155,7 @@ public class PlayController {
};
// 判断设备是否属于当前平台, 如果不属于则发起自动调用
if (userSetting.getServerId().equals(device.getServerId())) {
playRpcService.play(device.getServerId(), channel.getId(), callback);
redisRpcPlayService.play(device.getServerId(), channel.getId(), callback);
}else {
playService.play(device, channel, callback);
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
@ -30,6 +31,9 @@ public class MediaServerConfig implements CommandLineRunner {
@Autowired
private MediaConfig mediaConfig;
@Autowired
private UserSetting userSetting;
@Override
public void run(String... strings) throws Exception {
@ -37,6 +41,7 @@ public class MediaServerConfig implements CommandLineRunner {
mediaServerService.clearMediaServerForOnline();
MediaServer defaultMediaServer = mediaServerService.getDefaultMediaServer();
MediaServer mediaSerItemInConfig = mediaConfig.getMediaSerItem();
mediaSerItemInConfig.setServerId(userSetting.getServerId());
if (defaultMediaServer != null && mediaSerItemInConfig.getId().equals(defaultMediaServer.getId())) {
mediaServerService.update(mediaSerItemInConfig);
}else {

View File

@ -103,6 +103,9 @@ public class MediaServer {
@Schema(description = "转码的前缀")
private String transcodeSuffix;
@Schema(description = "服务Id")
private String serverId;
public MediaServer() {
}
@ -388,4 +391,12 @@ public class MediaServer {
public void setTranscodeSuffix(String transcodeSuffix) {
this.transcodeSuffix = transcodeSuffix;
}
public String getServerId() {
return serverId;
}
public void setServerId(String serverId) {
this.serverId = serverId;
}
}

View File

@ -303,7 +303,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaServerMapper.update(mediaSerItem);
MediaServer mediaServerInRedis = getOne(mediaSerItem.getId());
// 获取完整数据
MediaServer mediaServerInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
MediaServer mediaServerInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId(), userSetting.getServerId());
if (mediaServerInDataBase == null) {
return;
}
@ -350,7 +350,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public List<MediaServer> getAll() {
List<MediaServer> mediaServerList = mediaServerMapper.queryAll();
List<MediaServer> mediaServerList = mediaServerMapper.queryAll(userSetting.getServerId());
if (mediaServerList.isEmpty()) {
return new ArrayList<>();
}
@ -366,7 +366,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public List<MediaServer> getAllFromDatabase() {
return mediaServerMapper.queryAll();
return mediaServerMapper.queryAll(userSetting.getServerId());
}
@Override
@ -403,7 +403,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public MediaServer getDefaultMediaServer() {
return mediaServerMapper.queryDefault();
return mediaServerMapper.queryDefault(userSetting.getServerId());
}
@Override
@ -423,7 +423,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[添加媒体节点] 失败, mediaServer的类型为空");
return;
}
if (mediaServerMapper.queryOne(mediaServer.getId()) != null) {
if (mediaServerMapper.queryOne(mediaServer.getId(), userSetting.getServerId()) != null) {
log.info("[添加媒体节点] 失败, 媒体服务ID已存在请修改媒体服务器配置, {}", mediaServer.getId());
throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败媒体服务ID [ " + mediaServer.getId() + " ] 已存在,请修改媒体服务器配置");
}
@ -521,7 +521,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public MediaServer checkMediaServer(String ip, int port, String secret, String type) {
if (mediaServerMapper.queryOneByHostAndPort(ip, port) != null) {
if (mediaServerMapper.queryOneByHostAndPort(ip, port, userSetting.getServerId()) != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "此连接已存在");
}
@ -532,7 +532,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
MediaServer mediaServer = mediaNodeServerService.checkMediaServer(ip, port, secret);
if (mediaServer != null) {
if (mediaServerMapper.queryOne(mediaServer.getId()) != null) {
if (mediaServerMapper.queryOne(mediaServer.getId(), userSetting.getServerId()) != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体服务ID [" + mediaServer.getId() + " ] 已存在,请修改媒体服务器配置");
}
}
@ -560,7 +560,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public void delete(MediaServer mediaServer) {
mediaServerMapper.delOne(mediaServer.getId());
mediaServerMapper.delOne(mediaServer.getId(), userSetting.getServerId());
redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), mediaServer.getId());
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServer.getId();
redisTemplate.delete(key);
@ -572,13 +572,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public MediaServer getOneFromDatabase(String mediaServerId) {
return mediaServerMapper.queryOne(mediaServerId);
return mediaServerMapper.queryOne(mediaServerId, userSetting.getServerId());
}
@Override
public void syncCatchFromDatabase() {
List<MediaServer> allInCatch = getAllOnlineList();
List<MediaServer> allInDatabase = mediaServerMapper.queryAll();
List<MediaServer> allInDatabase = mediaServerMapper.queryAll(userSetting.getServerId());
Map<String, MediaServer> mediaServerMap = new HashMap<>();
for (MediaServer mediaServer : allInDatabase) {
@ -606,7 +606,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public List<MediaServer> getAllWithAssistPort() {
return mediaServerMapper.queryAllWithAssistPort();
return mediaServerMapper.queryAllWithAssistPort(userSetting.getServerId());
}

View File

@ -89,6 +89,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
@Override
public MediaServer checkMediaServer(String ip, int port, String secret) {
MediaServer mediaServer = new MediaServer();
mediaServer.setServerId(userSetting.getServerId());
mediaServer.setIp(ip);
mediaServer.setHttpPort(port);
mediaServer.setFlvPort(port);

View File

@ -1,8 +1,10 @@
package com.genersoft.iot.vmp.gb28181.service;
package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
public interface IGbChannelRpcPlayService {
public interface IRedisRpcPlayService {
void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback);
}

View File

@ -23,4 +23,5 @@ public interface IRedisRpcService {
long onStreamOnlineEvent(String app, String stream, CommonCallback<StreamInfo> callback);
void unPushStreamOnlineEvent(String app, String stream);
}

View File

@ -0,0 +1,177 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import javax.sip.message.Response;
@Slf4j
@RedisRpcController("sendRtp")
public class RedisRpcSendRtpController {
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private UserSetting userSetting;
@Autowired
private HookSubscribe hookSubscribe;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IGbChannelService channelService;
@Autowired
private IGbChannelPlayService channelPlayService;
/**
*
*/
@RedisRpcMapping("getSendRtpItem")
public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
if (sendRtpItem == null) {
log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 callId{}", callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
return response;
}
log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流
MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem == null) {
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
}
// 自平台内容
int localPort = sendRtpServerService.getNextPort(mediaServerItem);
if (localPort == 0) {
log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
}
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setBody(callId);
return response;
}
/**
*
*/
@RedisRpcMapping("startSendRtp")
public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息 callId{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer == null) {
log.info("[redis-rpc] startSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaInfo == null) {
log.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
response.setBody(wvpResult);
return response;
}
try {
mediaServerService.startSendRtp(mediaServer, sendRtpItem);
}catch (ControllerException exception) {
log.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}{} {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg());
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
WVPResult wvpResult = WVPResult.success();
response.setBody(wvpResult);
return response;
}
/**
*
*/
@RedisRpcMapping("stopSendRtp")
public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(Response.OK);
if (sendRtpItem == null) {
log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息 key{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer == null) {
log.info("[redis-rpc] stopSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
try {
mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
}catch (ControllerException exception) {
log.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}{} code {}, msg: {}", sendRtpItem.getApp(),
sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() );
response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg()));
return response;
}
log.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
response.setBody(WVPResult.success());
return response;
}
}

View File

@ -3,39 +3,29 @@ package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sip.message.Response;
/**
* wvprpc RedisRpcConfig
*/
@Slf4j
@Component
public class RedisRpcController {
@RedisRpcController("streamPush")
public class RedisRpcStreamPushController {
@Autowired
private SSRCFactory ssrcFactory;
@ -61,52 +51,18 @@ public class RedisRpcController {
@Autowired
private IGbChannelPlayService channelPlayService;
/**
*
*/
public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
if (sendRtpItem == null) {
log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 callId{}", callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
return response;
}
log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流
MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem == null) {
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
}
// 自平台内容
int localPort = sendRtpServerService.getNextPort(mediaServerItem);
if (localPort == 0) {
log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
}
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setBody(callId);
return response;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
* 线
*/
@RedisRpcMapping("waitePushStreamOnline")
public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
SendRtpInfo sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpInfo.class);
log.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
@ -158,6 +114,7 @@ public class RedisRpcController {
/**
* 线
*/
@RedisRpcMapping("onStreamOnlineEvent")
public RedisRpcResponse onStreamOnlineEvent(RedisRpcRequest request) {
StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class);
log.info("[redis-rpc] 监听流信息,等待流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream());
@ -191,6 +148,7 @@ public class RedisRpcController {
/**
* 线
*/
@RedisRpcMapping("stopWaitePushStreamOnline")
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
SendRtpInfo sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpInfo.class);
log.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
@ -205,6 +163,7 @@ public class RedisRpcController {
/**
* 线
*/
@RedisRpcMapping("unPushStreamOnlineEvent")
public RedisRpcResponse unPushStreamOnlineEvent(RedisRpcRequest request) {
StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class);
log.info("[redis-rpc] 停止监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream());
@ -216,123 +175,4 @@ public class RedisRpcController {
return response;
}
/**
*
*/
public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息 callId{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer == null) {
log.info("[redis-rpc] startSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaInfo == null) {
log.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
response.setBody(wvpResult);
return response;
}
try {
mediaServerService.startSendRtp(mediaServer, sendRtpItem);
}catch (ControllerException exception) {
log.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}{} {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg());
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
WVPResult wvpResult = WVPResult.success();
response.setBody(wvpResult);
return response;
}
/**
*
*/
public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(Response.OK);
if (sendRtpItem == null) {
log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息 key{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
log.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer == null) {
log.info("[redis-rpc] stopSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
try {
mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
}catch (ControllerException exception) {
log.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}{} code {}, msg: {}", sendRtpItem.getApp(),
sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() );
response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg()));
return response;
}
log.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
response.setBody(WVPResult.success());
return response;
}
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
*
*/
public RedisRpcResponse playChannel(RedisRpcRequest request) {
int channelId = Integer.parseInt(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(Response.BAD_REQUEST);
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(Response.BAD_REQUEST);
response.setBody("param error");
return response;
}
channelPlayService.play(channel, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(Response.OK);
response.setBody(data);
}else {
response.setStatusCode(code);
}
// 手动发送结果
sendResponse(response);
});
return null;
}
}

View File

@ -0,0 +1,16 @@
package com.genersoft.iot.vmp.service.redisMsg.dto;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RedisRpcController {
/**
*
*/
String value() default "";
}

View File

@ -0,0 +1,13 @@
package com.genersoft.iot.vmp.service.redisMsg.dto;
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisRpcMapping {
/**
*
*/
String value() default "";
}

View File

@ -1,4 +1,4 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
package com.genersoft.iot.vmp.service.redisMsg.service;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.StreamInfo;
@ -6,8 +6,8 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelRpcPlayService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -16,8 +16,9 @@ import org.springframework.stereotype.Service;
import javax.sip.message.Response;
@Slf4j
@Service("playRpcService")
public class PlayRpcService implements IGbChannelRpcPlayService {
@Service
public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
@Autowired
private RedisRpcConfig redisRpcConfig;
@ -36,7 +37,6 @@ public class PlayRpcService implements IGbChannelRpcPlayService {
@Override
public void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback) {
log.info("[点播其他WVP的设备] 通道Id:{}", channelId);
RedisRpcRequest request = buildRequest("playChannel", channelId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout());
@ -52,3 +52,4 @@ public class PlayRpcService implements IGbChannelRpcPlayService {
}
}
}

View File

@ -58,7 +58,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public SendRtpInfo getSendRtpItem(String callId) {
RedisRpcRequest request = buildRequest("getSendRtpItem", callId);
RedisRpcRequest request = buildRequest("sendRtp/getSendRtpItem", callId);
RedisRpcResponse response = redisRpcConfig.request(request, 10);
if (response.getBody() == null) {
return null;
@ -69,7 +69,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
@Override
public WVPResult startSendRtp(String callId, SendRtpInfo sendRtpItem) {
log.info("[请求其他WVP] 开始推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("startSendRtp", callId);
RedisRpcRequest request = buildRequest("sendRtp/startSendRtp", callId);
request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
@ -83,7 +83,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息");
}
log.info("[请求其他WVP] 停止推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("stopSendRtp", callId);
RedisRpcRequest request = buildRequest("sendRtp/stopSendRtp", callId);
request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
@ -94,7 +94,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
log.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
RedisRpcRequest request = buildRequest("streamPush/waitePushStreamOnline", sendRtpItem);
request.setToId(sendRtpItem.getServerId());
hookSubscribe.addSubscribe(hook, (hookData) -> {
@ -135,7 +135,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
log.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
hookSubscribe.removeSubscribe(hook);
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
RedisRpcRequest request = buildRequest("streamPush/stopWaitePushStreamOnline", sendRtpItem);
request.setToId(sendRtpItem.getServerId());
redisRpcConfig.request(request, 10);
}
@ -147,7 +147,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
log.info("[停止WVP监听流上线] 未找到redis中的发流信息 key{}", callId);
return;
}
RedisRpcRequest request = buildRequest("rtpSendStopped", callId);
RedisRpcRequest request = buildRequest("streamPush/rtpSendStopped", callId);
request.setToId(sendRtpItem.getServerId());
redisRpcConfig.request(request, 10);
}
@ -166,7 +166,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
StreamInfo streamInfoParam = new StreamInfo();
streamInfoParam.setApp(app);
streamInfoParam.setStream(stream);
RedisRpcRequest request = buildRequest("onStreamOnlineEvent", streamInfoParam);
RedisRpcRequest request = buildRequest("streamPush/onStreamOnlineEvent", streamInfoParam);
hookSubscribe.addSubscribe(hook, (hookData) -> {
log.info("[请求所有WVP监听流上线] 监听流上线 {}/{}", app, stream);
if (callback != null) {
@ -198,7 +198,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
StreamInfo streamInfoParam = new StreamInfo();
streamInfoParam.setApp(app);
streamInfoParam.setStream(stream);
RedisRpcRequest request = buildRequest("unPushStreamOnlineEvent", streamInfoParam);
RedisRpcRequest request = buildRequest("streamPush/unPushStreamOnlineEvent", streamInfoParam);
redisRpcConfig.request(request, 10);
}
}

View File

@ -42,6 +42,7 @@ public interface MediaServerMapper {
"create_time,"+
"update_time,"+
"transcode_suffix,"+
"server_id,"+
"hook_alive_interval"+
") VALUES " +
"(" +
@ -74,6 +75,7 @@ public interface MediaServerMapper {
"#{createTime}, " +
"#{updateTime}, " +
"#{transcodeSuffix}, " +
"#{serverId}, " +
"#{hookAliveInterval})")
int add(MediaServer mediaServerItem);
@ -105,6 +107,7 @@ public interface MediaServerMapper {
"<if test=\"recordDay != null\">, record_day=#{recordDay}</if>" +
"<if test=\"recordPath != null\">, record_path=#{recordPath}</if>" +
"<if test=\"transcodeSuffix != null\">, transcode_suffix=#{transcodeSuffix}</if>" +
"<if test=\"serverId != null\">, server_id=#{serverId}</if>" +
"<if test=\"type != null\">, type=#{type}</if>" +
"WHERE id=#{id}"+
" </script>"})
@ -138,32 +141,27 @@ public interface MediaServerMapper {
"<if test=\"type != null\">, type=#{type}</if>" +
"<if test=\"transcodeSuffix != null\">, transcode_suffix=#{transcodeSuffix}</if>" +
"<if test=\"hookAliveInterval != null\">, hook_alive_interval=#{hookAliveInterval}</if>" +
"<if test=\"serverId != null\">, server_id=#{serverId}</if>" +
"WHERE ip=#{ip} and http_port=#{httpPort}"+
" </script>"})
int updateByHostAndPort(MediaServer mediaServerItem);
@Select("SELECT * FROM wvp_media_server WHERE id=#{id}")
MediaServer queryOne(String id);
@Select("SELECT * FROM wvp_media_server WHERE id=#{id} and server_id = #{serverId}")
MediaServer queryOne(@Param("id") String id, @Param("serverId") String serverId);
@Select("SELECT * FROM wvp_media_server")
List<MediaServer> queryAll();
@Select("SELECT * FROM wvp_media_server where server_id = #{serverId}")
List<MediaServer> queryAll(@Param("serverId") String serverId);
@Delete("DELETE FROM wvp_media_server WHERE id=#{id}")
void delOne(String id);
@Delete("DELETE FROM wvp_media_server WHERE id=#{id} and server_id = #{serverId}")
void delOne(String id, @Param("serverId") String serverId);
@Select("DELETE FROM wvp_media_server WHERE ip=#{host} and http_port=#{port}")
void delOneByIPAndPort(@Param("host") String host, @Param("port") int port);
@Select("SELECT * FROM wvp_media_server WHERE ip=#{host} and http_port=#{port} and server_id = #{serverId}")
MediaServer queryOneByHostAndPort(@Param("host") String host, @Param("port") int port, @Param("serverId") String serverId);
@Delete("DELETE FROM wvp_media_server WHERE default_server=true")
int delDefault();
@Select("SELECT * FROM wvp_media_server WHERE default_server=true and server_id = #{serverId}")
MediaServer queryDefault(@Param("serverId") String serverId);
@Select("SELECT * FROM wvp_media_server WHERE ip=#{host} and http_port=#{port}")
MediaServer queryOneByHostAndPort(@Param("host") String host, @Param("port") int port);
@Select("SELECT * FROM wvp_media_server WHERE default_server=true")
MediaServer queryDefault();
@Select("SELECT * FROM wvp_media_server WHERE record_assist_port > 0")
List<MediaServer> queryAllWithAssistPort();
@Select("SELECT * FROM wvp_media_server WHERE record_assist_port > 0 and server_id = #{serverId}")
List<MediaServer> queryAllWithAssistPort(@Param("serverId") String serverId);
}

View File

@ -29,6 +29,7 @@ create table wvp_device
custom_name character varying(255),
sdp_ip character varying(50),
local_ip character varying(50),
server_id character varying(50),
password character varying(255),
as_message_channel bool default false,
keepalive_interval_time integer,
@ -190,6 +191,7 @@ create table wvp_media_server
record_path character varying(255),
record_day integer default 7,
transcode_suffix character varying(255),
server_id character varying(50),
constraint uk_media_server_unique_ip_http_port unique (ip, http_port)
);

View File

@ -29,6 +29,7 @@ create table wvp_device
custom_name character varying(255),
sdp_ip character varying(50),
local_ip character varying(50),
server_id character varying(50),
password character varying(255),
as_message_channel bool default false,
keepalive_interval_time integer,
@ -207,6 +208,7 @@ create table wvp_media_server
record_path character varying(255),
record_day integer default 7,
transcode_suffix character varying(255),
server_id character varying(50),
constraint uk_media_server_unique_ip_http_port unique (ip, http_port)
);