diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index d19b8f05..af574b93 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -72,6 +72,8 @@ public class VideoManagerConstants { public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_"; public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; + public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; + public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index c14ebcdd..fb88f54a 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -31,9 +31,6 @@ public class RedisMsgListenConfig { @Autowired private RedisStreamMsgListener redisStreamMsgListener; - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @@ -49,6 +46,12 @@ public class RedisMsgListenConfig { @Autowired private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; + @Autowired + private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener; + + @Autowired + private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener; + /** * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 @@ -65,12 +68,13 @@ public class RedisMsgListenConfig { container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); - container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); + container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM)); + container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 30193d27..c0507df0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -22,6 +22,11 @@ public class SendRtpItem { */ private String platformId; + /** + * 平台名称 + */ + private String platformName; + /** * 对应设备id */ @@ -61,6 +66,11 @@ public class SendRtpItem { */ private boolean tcpActive; + /** + * 自己推流使用的IP + */ + private String localIp; + /** * 自己推流使用的端口 */ @@ -306,6 +316,22 @@ public class SendRtpItem { this.receiveStream = receiveStream; } + public String getPlatformName() { + return platformName; + } + + public void setPlatformName(String platformName) { + this.platformName = platformName; + } + + public String getLocalIp() { + return localIp; + } + + public void setLocalIp(String localIp) { + this.localIp = localIp; + } + @Override public String toString() { return "SendRtpItem{" + @@ -313,6 +339,7 @@ public class SendRtpItem { ", port=" + port + ", ssrc='" + ssrc + '\'' + ", platformId='" + platformId + '\'' + + ", platformName='" + platformName + '\'' + ", deviceId='" + deviceId + '\'' + ", app='" + app + '\'' + ", channelId='" + channelId + '\'' + @@ -320,6 +347,7 @@ public class SendRtpItem { ", stream='" + stream + '\'' + ", tcp=" + tcp + ", tcpActive=" + tcpActive + + ", localIp=" + localIp + ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 96b8b11e..35399227 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; @@ -28,7 +29,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -127,13 +127,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private SipConfig config; - - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - @Autowired private VideoStreamSessionManager streamSession; + @Autowired + private SendRtpPortManager sendRtpPortManager; + @Override public void afterPropertiesSet() throws Exception { @@ -577,21 +576,40 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }else { ssrc = gb28181Sdp.getSsrc(); } + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setTcpActive(tcpActive); + sendRtpItem.setTcp(mediaTransmissionTCP); + sendRtpItem.setRtcp(platform.isRtcp()); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setPlatformName(platform.getName()); + sendRtpItem.setPlatformId(platform.getServerGBId()); + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setIp(addressStr); + sendRtpItem.setPort(port); + sendRtpItem.setUsePs(true); + sendRtpItem.setApp(gbStream.getApp()); + sendRtpItem.setStream(gbStream.getStream()); + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setFromTag(request.getFromTag()); + sendRtpItem.setOnlyAudio(false); + sendRtpItem.setPlayType(InviteStreamType.PUSH); + sendRtpItem.setStatus(0); if ("push".equals(gbStream.getStreamType())) { if (streamPushItem != null) { // 从redis查询是否正在接收这个推流 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); + + sendRtpItem.setServerId(pushListItem.getSeverId()); if (pushListItem != null) { StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); // 推流状态 - pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + pushStream(sendRtpItem, mediaServerItem, platform, request); }else { // 未推流 拉起 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } else if ("proxy".equals(gbStream.getStreamType())) { @@ -601,8 +619,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } else { //开启代理拉流 - notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } @@ -659,8 +676,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setFromTag(request.getFromTag()); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } @@ -670,19 +688,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } - private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { + private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // 推流 - if (streamPushItem.isSelf()) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 自平台内容 - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { logger.warn("服务器端口资源不足"); try { responseAck(request, Response.BUSY_HERE); @@ -691,16 +704,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } @@ -708,210 +716,168 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } else { // 不在线 拉起 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } - } else { + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setRtcp(platform.isRtcp()); + sendRtpItem.setTcp(mediaTransmissionTCP); + sendRtpItem.setTcpActive(); // 其他平台内容 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + otherWvpPushStream(sendRtpItem, request, platform); } } /** * 通知流上线 */ - private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - if ("proxy".equals(gbStream.getStreamType())) { - // TODO 控制启用以使设备上线 - logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); - // 监听流上线 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; - logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); - dynamicTask.stop(callIdHeader.getCallId()); - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - }); - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream()); - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); - if (!start) { - try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); - } - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - dynamicTask.stop(callIdHeader.getCallId()); + private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // TODO 控制启用以使设备上线 + logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); + // 监听流上线 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); + dynamicTask.stop(callIdHeader.getCallId()); + pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }); + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream()); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + }, userSetting.getPlatformPlayTimeout()); + boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + if (!start) { + try { + responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); } - } else if ("push".equals(gbStream.getStreamType())) { - if (!platform.isStartOfflinePush()) { - // 平台设置中关闭了拉起离线的推流则直接回复 - try { - logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); - } - return; - } - // 发送redis消息以使设备上线 - logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); - - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), - platform.getName(), null, gbStream.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - // 设置超时 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); - try { - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.REQUEST_TIMEOUT); // 超时 - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("未处理的异常 ", e); - } - }, userSetting.getPlatformPlayTimeout()); - // 添加监听 - int finalPort = port; - Boolean finalTcpActive = tcpActive; - - // 添加在本机上线的通知 - mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { - dynamicTask.stop(callIdHeader.getCallId()); - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - if (serverId.equals(userSetting.getServerId())) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("未处理的异常 ", e); - } catch (InvalidArgumentException e) { - logger.error("未处理的异常 ", e); - } catch (ParseException e) { - logger.error("未处理的异常 ", e); - } - return; - } - if (finalTcpActive != null) { - sendRtpItem.setTcpActive(finalTcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - // 其他平台内容 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - }); - - // 添加回复的拒绝或者错误的通知 - redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(callIdHeader.getCallId()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage()); - } - } - }); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + dynamicTask.stop(callIdHeader.getCallId()); } } /** - * 来自其他wvp的推流 + * 通知流上线 */ - private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - logger.info("[级联点播]直播流来自其他平台,发送redis消息"); - // 发送redis消息 - redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), - streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, - channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> { - SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); - if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { - logger.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("未处理的异常 ", e); - } catch (InvalidArgumentException e) { - logger.error("未处理的异常 ", e); - } catch (ParseException e) { - logger.error("未处理的异常 ", e); - } - return; - } - // 收到sendItem - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); + private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + if (!platform.isStartOfflinePush()) { + // 平台设置中关闭了拉起离线的推流则直接回复 + try { + logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); + } + return; + } + // 发送redis消息以使设备上线 + logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - }, (wvpResult) -> { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, + gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), + platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + // 设置超时 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); + try { + redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + responseAck(request, Response.REQUEST_TIMEOUT); // 超时 + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("未处理的异常 ", e); + } + }, userSetting.getPlatformPlayTimeout()); + // 写入redis待发流信息,供其他wvp读取并生成发流信息 + SendRtpItem sendRtpItemTemp = new SendRtpItem(); + sendRtpItemTemp.setIp(addressStr); + sendRtpItemTemp.setPort(port); + sendRtpItemTemp.setSsrc(ssrc); + sendRtpItemTemp.setPlatformId(requesterId); + sendRtpItemTemp.setPlatformName(platform.getName()); + sendRtpItemTemp.setTcp(mediaTransmissionTCP); + sendRtpItemTemp.setRtcp(platform.isRtcp()); + sendRtpItemTemp.setTcpActive(tcpActive); + sendRtpItemTemp.setPlayType(InviteStreamType.PUSH); + redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout()); + // 添加上线的通知 + mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> { + dynamicTask.stop(callIdHeader.getCallId()); + redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); + if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { - // 错误 - if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { - // 离线 - // 查询是否在本机上线了 - StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isPushIng()) { - // 在线状态 - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - - } else { - // 不在线 拉起 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - } + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); try { responseAck(request, Response.BUSY_HERE); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 国标级联 点播回复 BUSY_HERE: {}", e.getMessage()); + } catch (SipException e) { + logger.error("未处理的异常 ", e); + } catch (InvalidArgumentException e) { + logger.error("未处理的异常 ", e); + } catch (ParseException e) { + logger.error("未处理的异常 ", e); } - }); + return; + } + sendRtpItemTemp.setLocalPort(localPort); + sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): ); + // 写入redis, 超时时回复 + sendRtpItemTemp.setStatus(1); + sendRtpItemTemp.setCallId(callIdHeader.getCallId()); + + sendRtpItemTemp.setFromTag(request.getFromTag()); + SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform); + if (response != null) { + sendRtpItemTemp.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItemTemp); + } else { + // 其他平台内容 + otherWvpPushStream(sendRtpItemFromRedis, request, platform); + } + }); + + // 添加回复的拒绝或者错误的通知 + redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(callIdHeader.getCallId()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + try { + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage()); + } + } + }); } - public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { - String sdpIp = mediaServerItem.getSdpIp(); + + /** + * 来自其他wvp的推流 + */ + private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { + logger.info("[级联点播]直播流来自其他平台,发送redis消息"); + // 发送redis消息 + redisCatchStorage.sendStartSendRtp(sendRtpItem); + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(request.getCallIdHeader().getCallId()); + sendRtpItem.setFromTag(request.getFromTag()); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + + public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) { + + String sdpIp = sendRtpItem.getLocalIp(); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { sdpIp = platform.getSendStreamIp(); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java index 714838ed..6b3c94f8 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.media.zlm.dto; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; + import java.text.ParseException; /** @@ -7,5 +9,5 @@ import java.text.ParseException; */ public interface ChannelOnlineEvent { - void run(String app, String stream, String serverId) throws ParseException; + void run(SendRtpItem sendRtpItem) throws ParseException; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 1a9e3e5c..6a4f866c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -61,6 +61,7 @@ public class MessageForPushChannel { messageForPushChannel.setGbId(gbId); messageForPushChannel.setApp(app); messageForPushChannel.setStream(stream); + messageForPushChannel.setServerId(serverId); messageForPushChannel.setMediaServerId(mediaServerId); messageForPushChannel.setPlatFormId(platFormId); messageForPushChannel.setPlatFormName(platFormName); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java deleted file mode 100755 index 3b990f00..00000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ /dev/null @@ -1,474 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - - -/** - * 监听下级发送推送信息,并发送国标推流消息上级 - * @author lin - */ -@Component -public class RedisGbPlayMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class); - - public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM"; - - /** - * 流媒体不存在的错误玛 - */ - public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1; - - /** - * 离线的错误玛 - */ - public static final int ERROR_CODE_OFFLINE = -2; - - /** - * 超时的错误玛 - */ - public static final int ERROR_CODE_TIMEOUT = -3; - - private Map callbacks = new ConcurrentHashMap<>(); - private Map callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); - private Map callbacksForError = new ConcurrentHashMap<>(); - - @Autowired - private UserSetting userSetting; - - - @Autowired - private RedisTemplate redisTemplate; - - @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private IMediaServerService mediaServerService; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - - @Autowired - private DynamicTask dynamicTask; - - - @Autowired - private ZlmHttpHookSubscribe subscribe; - - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - - public interface PlayMsgCallback{ - void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException; - } - - public interface PlayMsgCallbackForStartSendRtpStream{ - void handler(JSONObject jsonObject); - } - - public interface PlayMsgErrorCallback{ - void handler(WVPResult wvpResult); - } - - @Override - public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class); - logger.info("[收到REDIS通知] 消息: {}", JSON.toJSONString(wvpRedisMsg)); - if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { - continue; - } - if (WvpRedisMsg.isRequest(wvpRedisMsg)) { - logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); - - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: - RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class); - requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); - requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: - RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); - requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - default: - break; - } - - }else { - logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: - - WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); - - String key = wvpRedisMsg.getSerial(); - switch (content.getCode()) { - case 0: - ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData()); - PlayMsgCallback playMsgCallback = callbacks.get(key); - if (playMsgCallback != null) { - callbacksForError.remove(key); - try { - playMsgCallback.handler(responseSendItemMsg); - } catch (ParseException e) { - logger.error("[REDIS消息处理异常] ", e); - } - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(key); - if (errorCallback != null) { - callbacks.remove(key); - errorCallback.handler(content); - } - break; - default: - break; - } - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); - String serial = wvpRedisMsg.getSerial(); - switch (wvpResult.getCode()) { - case 0: - JSONObject jsonObject = (JSONObject)wvpResult.getData(); - PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); - if (playMsgCallback != null) { - callbacksForError.remove(serial); - playMsgCallback.handler(jsonObject); - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); - if (errorCallback != null) { - callbacks.remove(serial); - errorCallback.handler(wvpResult); - } - break; - default: - break; - } - break; - default: - break; - } - - } - }catch (Exception e) { - logger.warn("[RedisGbPlayMsg] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - logger.error("[RedisGbPlayMsg] 异常内容: ", e); - } - } - }); - } - } - - /** - * 处理收到的请求推流的请求 - */ - private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { - MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); - if (mediaInfo == null) { - // TODO 回复错误 - return; - } - String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; - Map param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",requestPushStreamMsg.getApp()); - param.put("stream",requestPushStreamMsg.getStream()); - param.put("ssrc", requestPushStreamMsg.getSsrc()); - param.put("dst_url",requestPushStreamMsg.getIp()); - param.put("dst_port", requestPushStreamMsg.getPort()); - param.put("is_udp", is_Udp); - param.put("src_port", requestPushStreamMsg.getSrcPort()); - param.put("pt", requestPushStreamMsg.getPt()); - param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); - param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param); - // 回复消息 - responsePushStream(jsonObject, fromId, serial); - } - - private void responsePushStream(JSONObject content, String toId, String serial) { - - WVPResult result = new WVPResult<>(); - result.setCode(0); - result.setData(content); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result)); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 处理收到的请求sendItem的请求 - */ - private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { - MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); - if (mediaServerItem == null) { - logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId()); - - WVPResult result = new WVPResult<>(); - result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND); - result.setMsg("流媒体不存在"); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, - WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - return; - } - // 确定流是否在线 - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); - if (streamReady != null && streamReady) { - logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream()); - responseSendItem(mediaServerItem, content, toId, serial); - }else { - // 流已经离线 - // 发送redis消息以使设备上线 - logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",content.getApp(), content.getStream()); - - String taskKey = UUID.randomUUID().toString(); - // 设置超时 - dynamicTask.startDelay(taskKey, ()->{ - logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream()); - WVPResult result = new WVPResult<>(); - result.setCode(ERROR_CODE_TIMEOUT); - WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) - ); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - }, userSetting.getPlatformPlayTimeout()); - - // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); - - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{ - dynamicTask.stop(taskKey); - responseSendItem(mediaServerItem, content, toId, serial); - }); - - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), - content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), - content.getMediaServerId()); - - String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; - logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream()); - redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); - } - } - - /** - * 将获取到的sendItem发送出去 - */ - private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), - content.getPort(), content.getSsrc(), content.getPlatformId(), - content.getApp(), content.getStream(), content.getChannelId(), - content.getTcp(), content.getRtcp()); - - WVPResult result = new WVPResult<>(); - result.setCode(0); - ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg(); - responseSendItemMsg.setSendRtpItem(sendRtpItem); - responseSendItemMsg.setMediaServerItem(mediaServerItem); - result.setData(responseSendItemMsg); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - WvpRedisMsg response = WvpRedisMsg.getResponseInstance( - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) - ); - JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 发送消息要求下级生成推流信息 - * @param serverId 下级服务ID - * @param app 应用名 - * @param stream 流ID - * @param ip 目标IP - * @param port 目标端口 - * @param ssrc ssrc - * @param platformId 平台国标编号 - * @param channelId 通道ID - * @param isTcp 是否使用TCP - * @param callback 得到信息的回调 - */ - public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, - String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { - RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( - serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName); - requestSendItemMsg.setServerId(serverId); - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, - key, JSON.toJSONString(requestSendItemMsg)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject); - callbacks.put(key, callback); - callbacksForError.put(key, errorCallback); - dynamicTask.startDelay(key, ()->{ - callbacks.remove(key); - callbacksForError.remove(key); - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(ERROR_CODE_TIMEOUT); - wvpResult.setMsg("timeout"); - errorCallback.handler(wvpResult); - }, userSetting.getPlatformPlayTimeout()); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 发送请求推流的消息 - * @param param 推流参数 - * @param callback 回调 - */ - public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject); - dynamicTask.startDelay(key, ()->{ - callbacksForStartSendRtpStream.remove(key); - callbacksForError.remove(key); - }, userSetting.getPlatformPlayTimeout()); - callbacksForStartSendRtpStream.put(key, callback); - callbacksForError.put(key, (wvpResult)->{ - logger.info("[REDIS 请求其他平台推流] 失败: {}", wvpResult.getMsg()); - callbacksForStartSendRtpStream.remove(key); - callbacksForError.remove(key); - }); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - /** - * 发送请求推流的消息 - */ - public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { - String key = UUID.randomUUID().toString(); - WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, - WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); - - JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); - logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject); - redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); - } - - private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { - if (platformGbId == null) { - platformGbId = "*"; - } - if (channelId == null) { - channelId = "*"; - } - if (streamId == null) { - streamId = "*"; - } - if (callId == null) { - callId = "*"; - } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - if (scan.size() > 0) { - return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0)); - }else { - return null; - } - } - - /** - * 处理收到的请求推流的请求 - */ - private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { - SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); - if (sendRtpItem == null) { - logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL"); - return; - } - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaInfo == null) { - // TODO 回复错误 - return; - } - Map param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStream()); - param.put("ssrc", sendRtpItem.getSsrc()); - - if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { - logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); - // 发送redis消息 - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - } - - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java new file mode 100755 index 00000000..14a96e83 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java @@ -0,0 +1,81 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 收到消息后开始给上级发流 + * @author lin + */ +@Component +public class RedisPlatformStartSendRtpListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPlatformStartSendRtpListener.class); + + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody())); + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); + if (messageForPushChannel == null + || ObjectUtils.isEmpty(messageForPushChannel.getApp()) + || ObjectUtils.isEmpty(messageForPushChannel.getStream()) + || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ + continue; + } + + // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", + null); + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + // 读取redis中的上级点播信息,生成sendRtpItm发送出去 + + }); + + + }catch (Exception e) { + logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); + logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); + } + } + }); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java new file mode 100755 index 00000000..f3b415d7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java @@ -0,0 +1,81 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 上级等到设备推流的redis消息 + * @author lin + */ +@Component +public class RedisPlatformWaitPushStreamOnlineListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPlatformWaitPushStreamOnlineListener.class); + + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody())); + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); + if (messageForPushChannel == null + || ObjectUtils.isEmpty(messageForPushChannel.getApp()) + || ObjectUtils.isEmpty(messageForPushChannel.getStream()) + || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ + continue; + } + + // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 + HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( + messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", + null); + hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + // 读取redis中的上级点播信息,生成sendRtpItm发送出去 + + }); + + + }catch (Exception e) { + logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); + logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); + } + } + }); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 66db1039..1e5e93dd 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -217,4 +217,7 @@ public interface IRedisCatchStorage { void sendPushStreamClose(MessageForPushChannel messageForPushChannel); + void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout); + + void sendStartSendRtp(SendRtpItem sendRtpItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 1eac4dfd..60084df1 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -678,4 +678,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { logger.info("[redis发送通知] 发送 停止向上级推流 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); redisTemplate.convertAndSend(key, JSON.toJSON(msg)); } + + @Override + public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) { + String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); + redisTemplate.opsForValue().set(key, platformPlayTimeout); + } + + @Override + public void sendStartSendRtp(SendRtpItem sendRtpItem) { + String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); + redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem)); + } }