From 9c6765d44ef2ccb06fdaf525a06e564a331ab892 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 16 Apr 2024 22:10:35 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=A4=9Awvp=E5=9B=BD?= =?UTF-8?q?=E6=A0=87=E7=BA=A7=E8=81=94=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/conf/redis/RedisMsgListenConfig.java | 18 +- .../iot/vmp/conf/redis/RedisRpcConfig.java | 205 ++++++++++++++++++ .../vmp/conf/redis/bean/RedisRpcMessage.java | 24 ++ .../vmp/conf/redis/bean/RedisRpcRequest.java | 93 ++++++++ .../vmp/conf/redis/bean/RedisRpcResponse.java | 87 ++++++++ .../request/impl/AckRequestProcessor.java | 16 +- .../request/impl/ByeRequestProcessor.java | 14 +- .../request/impl/InviteRequestProcessor.java | 45 +--- .../vmp/media/zlm/ZLMHttpHookListener.java | 4 - .../iot/vmp/media/zlm/ZLMServerFactory.java | 9 + .../iot/vmp/service/IMediaServerService.java | 2 +- .../service/impl/MediaServerServiceImpl.java | 21 +- .../service/redisMsg/IRedisRpcService.java | 16 ++ .../RedisPlatformPushStreamOnlineLister.java | 97 --------- .../RedisPlatformStartSendRtpListener.java | 122 ----------- ...sPlatformWaitPushStreamOnlineListener.java | 106 --------- .../RedisPushStreamCloseResponseListener.java | 99 --------- .../RedisPushStreamResponseListener.java | 76 ------- .../redisMsg/control/RedisRpcController.java | 203 +++++++++++++++++ .../redisMsg/service/RedisRpcServiceImpl.java | 100 +++++++++ .../storager/impl/RedisCatchStorageImpl.java | 7 +- 21 files changed, 789 insertions(+), 575 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java delete mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java delete mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java delete mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java delete mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java delete mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index 5385efab..5ddaed33 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -34,23 +34,13 @@ public class RedisMsgListenConfig { @Autowired private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private RedisCloseStreamMsgListener redisCloseStreamMsgListener; - @Autowired - private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; @Autowired - private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener; - - @Autowired - private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener; - - @Autowired - private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; + private RedisRpcConfig redisRpcConfig; /** @@ -69,12 +59,8 @@ public class RedisMsgListenConfig { container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); - container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); - container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); - container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM)); - container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED)); - container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE)); + container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java new file mode 100644 index 00000000..545fdc11 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -0,0 +1,205 @@ +package com.genersoft.iot.vmp.conf.redis; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +@Component +public class RedisRpcConfig implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class); + + public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY"; + + private final Random random = new Random(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisRpcController redisRpcController; + + @Autowired + private RedisTemplate redisTemplate; + + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] pattern) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class); + if (redisRpcMessage.getRequest() != null) { + handlerRequest(redisRpcMessage.getRequest()); + } else if (redisRpcMessage.getResponse() != null){ + handlerResponse(redisRpcMessage.getResponse()); + } else { + logger.error("[redis rpc 解析失败] {}", JSON.toJSONString(redisRpcMessage)); + } + } catch (Exception e) { + logger.error("[redis rpc 解析异常] ", e); + } + } + }); + } + } + + private void handlerResponse(RedisRpcResponse response) { + if (userSetting.getServerId().equals(response.getToId())) { + return; + } + response(response); + } + + private void handlerRequest(RedisRpcRequest request) { + try { + if (userSetting.getServerId().equals(request.getFromId())) { + return; + } + Method method = getMethod(request.getUri()); + // 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404 + if (userSetting.getServerId().equals(request.getToId())) { + if (method == null) { + // 回复404结果 + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(404); + sendResponse(response); + return; + } + RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + if(response != null) { + sendResponse(response); + } + }else { + if (method == null) { + return; + } + RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + if (response != null) { + sendResponse(response); + } + } + }catch (InvocationTargetException | IllegalAccessException e) { + logger.error("[redis rpc ] 处理请求失败 ", e); + } + + } + + private Method getMethod(String name) { + // 启动后扫描所有的路径注解 + Method[] methods = redisRpcController.getClass().getMethods(); + for (Method method : methods) { + if (method.getName().equals(name)) { + return method; + } + } + return null; + } + + private void sendResponse(RedisRpcResponse response){ + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); + } + + private void sendRequest(RedisRpcRequest request){ + RedisRpcMessage message = new RedisRpcMessage(); + message.setRequest(request); + redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); + } + + + private final Map> topicSubscribers = new ConcurrentHashMap<>(); + private final Map> callbacks = new ConcurrentHashMap<>(); + + public RedisRpcResponse request(RedisRpcRequest request, int timeOut) { + request.setSn((long) random.nextInt(1000) + 1); + SynchronousQueue subscribe = subscribe(request.getSn()); + try { + sendRequest(request); + return subscribe.poll(timeOut, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e); + } finally { + this.unsubscribe(request.getSn()); + } + return null; + } + + public void request(RedisRpcRequest request, CommonCallback callback) { + request.setSn((long) random.nextInt(1000) + 1); + setCallback(request.getSn(), callback); + sendRequest(request); + } + + public Boolean response(RedisRpcResponse response) { + SynchronousQueue queue = topicSubscribers.get(response.getSn()); + CommonCallback callback = callbacks.get(response.getSn()); + if (queue != null) { + try { + return queue.offer(response, 2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("{}", e.getMessage(), e); + } + }else if (callback != null) { + callback.run(response); + callbacks.remove(response.getSn()); + } + return false; + } + + private void unsubscribe(long key) { + topicSubscribers.remove(key); + } + + + private SynchronousQueue subscribe(long key) { + SynchronousQueue queue = null; + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue<>()); + return queue; + } + + private void setCallback(long key, CommonCallback callback) { + if (!callbacks.containsKey(key)) { + callbacks.put(key, callback); + } + + } + + public void removeCallback(long key) { + callbacks.remove(key); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java new file mode 100644 index 00000000..061df6f3 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +public class RedisRpcMessage { + + private RedisRpcRequest request; + + private RedisRpcResponse response; + + public RedisRpcRequest getRequest() { + return request; + } + + public void setRequest(RedisRpcRequest request) { + this.request = request; + } + + public RedisRpcResponse getResponse() { + return response; + } + + public void setResponse(RedisRpcResponse response) { + this.response = response; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java new file mode 100644 index 00000000..e31eb454 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java @@ -0,0 +1,93 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +/** + * 通过redis发送请求 + */ +public class RedisRpcRequest { + + /** + * 来自的WVP ID + */ + private String fromId; + + + /** + * 目标的WVP ID + */ + private String toId; + + /** + * 序列号 + */ + private long sn; + + /** + * 访问的路径 + */ + private String uri; + + /** + * 参数 + */ + private Object param; + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public String getToId() { + return toId; + } + + public void setToId(String toId) { + this.toId = toId; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public Object getParam() { + return param; + } + + public void setParam(Object param) { + this.param = param; + } + + public long getSn() { + return sn; + } + + public void setSn(long sn) { + this.sn = sn; + } + + @Override + public String toString() { + return "RedisRpcRequest{" + + "fromId='" + fromId + '\'' + + ", toId='" + toId + '\'' + + ", sn='" + sn + '\'' + + ", uri='" + uri + '\'' + + ", param=" + param + + '}'; + } + + public RedisRpcResponse getResponse() { + RedisRpcResponse response = new RedisRpcResponse(); + response.setFromId(fromId); + response.setToId(toId); + response.setSn(sn); + response.setUri(uri); + return response; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java new file mode 100644 index 00000000..ef948166 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java @@ -0,0 +1,87 @@ +package com.genersoft.iot.vmp.conf.redis.bean; + +/** + * 通过redis发送回复 + */ +public class RedisRpcResponse { + + /** + * 来自的WVP ID + */ + private String fromId; + + + /** + * 目标的WVP ID + */ + private String toId; + + + /** + * 序列号 + */ + private long sn; + + /** + * 状态码 + */ + private int statusCode; + + /** + * 访问的路径 + */ + private String uri; + + /** + * 参数 + */ + private Object body; + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public String getToId() { + return toId; + } + + public void setToId(String toId) { + this.toId = toId; + } + + public long getSn() { + return sn; + } + + public void setSn(long sn) { + this.sn = sn; + } + + public int getStatusCode() { + return statusCode; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public Object getBody() { + return body; + } + + public void setBody(Object body) { + this.body = body; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index af7db2e3..b748451c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -15,9 +15,11 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -54,6 +56,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IRedisRpcService redisRpcService; @Autowired private UserSetting userSetting; @@ -113,7 +117,15 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (parentPlatform != null) { Map param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - redisCatchStorage.sendStartSendRtp(sendRtpItem); +// redisCatchStorage.sendStartSendRtp(sendRtpItem); + WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem); + if (wvpResult.getCode() == 0) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), + sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(), + sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel); + } } else { JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); if (startSendRtpStreamResult != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 69f81425..2c134fd0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -18,7 +18,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -97,6 +97,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IStreamPushService pushService; + @Autowired + private IRedisRpcService redisRpcService; + @Override public void afterPropertiesSet() throws Exception { @@ -134,13 +137,8 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { // 查询这路流是否是本平台的 StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); - if (push!= null && !push.isSelf()) { - // 不是本平台的就发送redis消息让其他wvp停止发流 - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); - if (platform != null) { - RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); -// redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); - } + if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { + redisRpcService.stopSendRtp(sendRtpItem); }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f12f38ac..59ff50c3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -29,7 +29,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -86,7 +85,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private IRedisCatchStorage redisCatchStorage; @Autowired - private IInviteStreamService inviteStreamService; + private IRedisRpcService redisRpcService; @Autowired private SSRCFactory ssrcFactory; @@ -94,9 +93,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private DynamicTask dynamicTask; - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; - @Autowired private IPlayService playService; @@ -121,9 +117,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private UserSetting userSetting; - @Autowired - private RedisPlatformPushStreamOnlineLister mediaListManager; - @Autowired private SipConfig config; @@ -594,15 +587,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setSessionName(sessionName); if ("push".equals(gbStream.getStreamType())) { + sendRtpItem.setPlayType(InviteStreamType.PUSH); if (streamPushItem != null) { // 从redis查询是否正在接收这个推流 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - if (pushListItem != null) { sendRtpItem.setServerId(pushListItem.getSeverId()); + sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); + StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); - // 推流状态 + // 开始推流 sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { if (!platform.isStartOfflinePush()) { @@ -702,8 +697,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } // 写入redis, 超时时回复 sendRtpItem.setStatus(1); - sendRtpItem.setFromTag(request.getFromTag()); - sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); @@ -714,7 +707,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { // 不在线 拉起 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); @@ -769,18 +761,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); try { - redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); - mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 超时 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } }, userSetting.getPlatformPlayTimeout()); - redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout()); - // 添加上线的通知 - mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> { + // + redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> { dynamicTask.stop(sendRtpItem.getCallId()); - redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -813,19 +801,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 其他平台内容 otherWvpPushStream(sendRtpItemFromRedis, request, platform); } - }); - // 添加回复的拒绝或者错误的通知 - redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(sendRtpItem.getCallId()); - mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage()); - } - } }); } @@ -836,12 +812,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements */ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { logger.info("[级联点播]直播流来自其他平台,发送redis消息"); - // 发送redis消息 - redisCatchStorage.sendStartSendRtp(sendRtpItem); + sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); - sendRtpItem.setCallId(request.getCallIdHeader().getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 346b4a2f..32bf76a8 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -23,7 +23,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -100,9 +99,6 @@ public class ZLMHttpHookListener { @Autowired private EventPublisher eventPublisher; - @Autowired - private RedisPlatformPushStreamOnlineLister zlmMediaListManager; - @Autowired private ZlmHttpHookSubscribe subscribe; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 027e990e..ec24abea 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -365,4 +365,13 @@ public class ZLMServerFactory { } return result; } + + public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) { + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + return zlmresTfulUtils.startSendRtp(mediaServerItem, param); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 2e6151d6..972db32f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.vmanager.bean.RecordFile; import java.util.List; @@ -97,4 +96,5 @@ public interface IMediaServerService { List getAllWithAssistPort(); + MediaServerItem getMediaServerByAppAndStream(String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 190d665b..aeb0dc80 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.RecordFile; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -36,19 +35,15 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; -import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import java.io.File; import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; /** * 媒体服务器节点管理 @@ -751,6 +746,22 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public List getAllWithAssistPort() { + return mediaServerMapper.queryAllWithAssistPort(); } + + @Override + public MediaServerItem getMediaServerByAppAndStream(String app, String stream) { + List mediaServerItemList = getAllOnline(); + if (mediaServerItemList.isEmpty()) { + return null; + } + for (MediaServerItem mediaServerItem : mediaServerItemList) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady) { + return mediaServerItem; + } + } + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java new file mode 100644 index 00000000..a601ae93 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -0,0 +1,16 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; + +public interface IRedisRpcService { + + SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem); + + WVPResult startSendRtp(SendRtpItem sendRtpItem); + + void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback); + + WVPResult stopSendRtp(SendRtpItem sendRtpItem); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java deleted file mode 100755 index 8ad08071..00000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.*; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; -import com.genersoft.iot.vmp.utils.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * @author lin - */ -@Component -public class RedisPlatformPushStreamOnlineLister implements MessageListener { - - private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister"); - - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - /** - * 通过redis消息接收流上线的通知,如果本机由对这个流的监听,则回调 - */ - @Override - public void onMessage(Message message, byte[] pattern) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); - sendStreamEvent(sendRtpItem); - } - }); - } - } - - private final Map channelOnPublishEvents = new ConcurrentHashMap<>(); - - public void sendStreamEvent(SendRtpItem sendRtpItem) { - // 查看推流状态 - ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); - if (channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(sendRtpItem); - } catch (ParseException e) { - logger.error("sendStreamEvent: ", e); - } - removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); - } - } - - public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { - this.channelOnPublishEvents.put(app + "_" + stream, callback); - } - - public void removedChannelOnlineEventLister(String app, String stream) { - this.channelOnPublishEvents.remove(app + "_" + stream); - } - - public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { - return this.channelOnPublishEvents.get(app + "_" + stream); - } - - -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java deleted file mode 100755 index 25dd334c..00000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * 收到消息后开始给上级发流 - * @author lin - */ -@Component -public class RedisPlatformStartSendRtpListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPlatformStartSendRtpListener.class); - - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private IMediaServerService mediaServerService; - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); - sendRtpItem.getMediaServerId(); - MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaServer == null) { - return; - } - Map sendRtpParam = getSendRtpParam(sendRtpItem); - sendRtp(sendRtpItem, mediaServer, sendRtpParam); - - }catch (Exception e) { - logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); - } - } - }); - } - } - - private Map getSendRtpParam(SendRtpItem sendRtpItem) { - String isUdp = sendRtpItem.isTcp() ? "0" : "1"; - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStream()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("dst_url",sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - param.put("is_udp", isUdp); - if (!sendRtpItem.isTcp()) { - // udp模式下开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } - return param; - } - - private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map param){ - JSONObject startSendRtpStreamResult = null; - if (sendRtpItem.getLocalPort() != 0) { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); - } - }else { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); - } - } - return startSendRtpStreamResult; - - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java deleted file mode 100755 index 25600a23..00000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java +++ /dev/null @@ -1,106 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * 上级等到设备推流的redis消息 - * @author lin - */ -@Component -public class RedisPlatformWaitPushStreamOnlineListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPlatformWaitPushStreamOnlineListener.class); - - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Autowired - private UserSetting userSetting; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private ZlmHttpHookSubscribe hookSubscribe; - - @Autowired - private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; - - @Autowired - private SSRCFactory ssrcFactory; - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - /** - * 当上级点播时,这里负责监听等到流上线,流上线后如果是在当前服务则直接回调,如果是其他wvp,则由redis消息进行通知 - */ - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); - if (messageForPushChannel == null - || ObjectUtils.isEmpty(messageForPushChannel.getApp()) - || ObjectUtils.isEmpty(messageForPushChannel.getStream()) - || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ - continue; - } - - // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 - HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( - messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", - null); - hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { - // 读取redis中的上级点播信息,生成sendRtpItm发送出去 - SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream()); - if (sendRtpItem.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); - sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); - redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem); - // 通知其他wvp, 由RedisPlatformPushStreamOnlineLister接收此监听。 - redisCatchStorage.sendPushStreamOnline(sendRtpItem); - } - }); - - - }catch (Exception e) { - logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); - } - } - }); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java deleted file mode 100755 index a0315732..00000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.stereotype.Component; - -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * 接收redis发送的结束推流请求 - * @author lin - */ -@Component -public class RedisPushStreamCloseResponseListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class); - - @Autowired - private IStreamPushService streamPushService; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private ISIPCommanderForPlatform commanderFroPlatform; - - @Autowired - private UserSetting userSetting; - - @Autowired - private IMediaServerService mediaServerService; - - @Autowired - private ZLMServerFactory zlmServerFactory; - - - private Map responseEvents = new ConcurrentHashMap<>(); - - public interface PushStreamResponseEvent{ - void run(MessageForPushChannelResponse response); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS消息-推流结束]: {}", new String(message.getBody())); - MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); - StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); - if (push != null) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( - push.getGbId()); - if (!sendRtpItems.isEmpty()) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); - if (parentPlatform != null) { - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); - try { - commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - } - } - } - } - - } - - public void addEvent(String app, String stream, PushStreamResponseEvent callback) { - responseEvents.put(app + stream, callback); - } - - public void removeEvent(String app, String stream) { - responseEvents.remove(app + stream); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java deleted file mode 100755 index c90771be..00000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * 接收redis返回的推流结果 - * @author lin - */ -@Component -public class RedisPushStreamResponseListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); - - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - private Map responseEvents = new ConcurrentHashMap<>(); - - public interface PushStreamResponseEvent{ - void run(MessageForPushChannelResponse response); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ - logger.info("[REDIS消息-请求推流结果]:参数不全"); - continue; - } - // 查看正在等待的invite消息 - if (responseEvents.get(response.getApp() + response.getStream()) != null) { - responseEvents.get(response.getApp() + response.getStream()).run(response); - } - }catch (Exception e) { - logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); - } - } - }); - } - } - - public void addEvent(String app, String stream, PushStreamResponseEvent callback) { - responseEvents.put(app + stream, callback); - } - - public void removeEvent(String app, String stream) { - responseEvents.remove(app + stream); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java new file mode 100644 index 00000000..7a81eabe --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -0,0 +1,203 @@ +package com.genersoft.iot.vmp.service.redisMsg.control; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +/** + * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用 + */ +@Component +public class RedisRpcController { + + private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class); + + @Autowired + private SSRCFactory ssrcFactory; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private ZLMServerFactory zlmServerFactory; + + + @Autowired + private RedisTemplate redisTemplate; + + + /** + * 获取发流的信息 + */ + public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 获取发流的信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + // 查询本级是否有这个流 + MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaServerItem == null) { + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + // 自平台内容 + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + sendRtpItem.setServerId(userSetting.getServerId()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + if (sendRtpItem.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + response.setBody(sendRtpItem); + return response; + } + + /** + * 监听流上线 + */ + public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 监听流上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + // 查询本级是否有这个流 + MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (mediaServerItem != null) { + logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + RedisRpcResponse response = request.getResponse(); + response.setBody(sendRtpItem); + response.setStatusCode(200); + } + // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + // 读取redis中的上级点播信息,生成sendRtpItm发送出去 + if (sendRtpItem.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); + sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + RedisRpcResponse response = request.getResponse(); + response.setBody(sendRtpItem); + response.setStatusCode(200); + // 手动发送结果 + sendResponse(response); + + }); + return null; + } + + + /** + * 开始发流 + */ + public RedisRpcResponse startSendRtp(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (!streamReady) { + logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (jsonObject.getInteger("code") == 0) { + WVPResult wvpResult = WVPResult.success(); + response.setBody(wvpResult); + }else { + WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg")); + response.setBody(wvpResult); + } + return response; + } + + /** + * 停止发流 + */ + public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { + SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class); + logger.info("[redis-rpc] 停止推流: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + } + JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(200); + if (jsonObject.getInteger("code") == 0) { + logger.info("[redis-rpc] 停止推流成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + WVPResult wvpResult = WVPResult.success(); + response.setBody(wvpResult); + }else { + int code = jsonObject.getInteger("code"); + String msg = jsonObject.getString("msg"); + logger.info("[redis-rpc] 停止推流失败: {}/{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(),code, msg ); + WVPResult wvpResult = WVPResult.fail(code, msg); + response.setBody(wvpResult); + } + return response; + } + + private void sendResponse(RedisRpcResponse response){ + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java new file mode 100644 index 00000000..e9a00a98 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -0,0 +1,100 @@ +package com.genersoft.iot.vmp.service.redisMsg.service; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class RedisRpcServiceImpl implements IRedisRpcService { + + + @Autowired + private RedisRpcConfig redisRpcConfig; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private SSRCFactory ssrcFactory; + + private RedisRpcRequest buildRequest(String uri, Object param) { + RedisRpcRequest request = new RedisRpcRequest(); + request.setFromId(userSetting.getServerId()); + request.setParam(param); + request.setUri(uri); + return request; + } + + @Override + public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) { + + RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + return JSON.parseObject(response.getBody().toString(), SendRtpItem.class); + } + + @Override + public WVPResult startSendRtp(SendRtpItem sendRtpItem) { + RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } + + @Override + public WVPResult stopSendRtp(SendRtpItem sendRtpItem) { + RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + RedisRpcResponse response = redisRpcConfig.request(request, 10); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } + + @Override + public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback callback) { + // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + + // 读取redis中的上级点播信息,生成sendRtpItm发送出去 + if (sendRtpItem.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); + sendRtpItem.setSsrc(ssrc); + } + sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); + sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); + sendRtpItem.setServerId(userSetting.getServerId()); + if (callback != null) { + callback.run(sendRtpItem); + } + hookSubscribe.removeSubscribe(hook); + }); + RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem); + request.setToId(sendRtpItem.getServerId()); + redisRpcConfig.request(request, response -> { + SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class); + if (callback != null) { + callback.run(sendRtpItemFromOther); + } + }); + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 70888377..60ebfab0 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -682,19 +682,20 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) { String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); - redisTemplate.opsForValue().set(key, platformPlayTimeout); + redisTemplate.opsForValue().set(key, sendRtpItem); } @Override public SendRtpItem getWaiteSendRtpItem(String app, String stream) { String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream; - return (SendRtpItem)redisTemplate.opsForValue().get(key); + return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class); } @Override public void sendStartSendRtp(SendRtpItem sendRtpItem) { String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); - redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem)); + logger.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); } @Override