From b4168c02cba462571dd3f5bdc1d0b1ffddbc938a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 16 Apr 2024 00:10:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=A4=9Awvp=E5=9B=BD?= =?UTF-8?q?=E6=A0=87=E7=BA=A7=E8=81=94=E6=8E=A8=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 1 + .../vmp/conf/redis/RedisMsgListenConfig.java | 10 +- .../iot/vmp/gb28181/bean/SendRtpItem.java | 16 +- .../request/impl/AckRequestProcessor.java | 12 +- .../request/impl/ByeRequestProcessor.java | 5 +- .../request/impl/InviteRequestProcessor.java | 188 ++++++++---------- .../vmp/media/zlm/ZLMHttpHookListener.java | 26 +-- .../vmp/media/zlm/ZLMMediaListManager.java | 138 ------------- .../iot/vmp/service/IStreamPushService.java | 1 + .../iot/vmp/service/impl/PlayServiceImpl.java | 14 +- .../service/impl/StreamPushServiceImpl.java | 17 ++ .../RedisPlatformPushStreamOnlineLister.java | 97 +++++++++ .../RedisPlatformStartSendRtpListener.java | 77 +++++-- ...sPlatformWaitPushStreamOnlineListener.java | 27 ++- .../redisMsg/RedisStreamMsgListener.java | 97 --------- .../iot/vmp/storager/IRedisCatchStorage.java | 4 + .../storager/impl/RedisCatchStorageImpl.java | 13 ++ 17 files changed, 335 insertions(+), 408 deletions(-) delete mode 100755 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java create mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java delete mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index af574b93..efe40880 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -74,6 +74,7 @@ public class VideoManagerConstants { public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; + public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index fb88f54a..5385efab 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -28,9 +28,6 @@ public class RedisMsgListenConfig { @Autowired private RedisAlarmMsgListener redisAlarmMsgListener; - @Autowired - private RedisStreamMsgListener redisStreamMsgListener; - @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @@ -52,6 +49,9 @@ public class RedisMsgListenConfig { @Autowired private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener; + @Autowired + private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; + /** * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 @@ -67,14 +67,14 @@ public class RedisMsgListenConfig { container.setConnectionFactory(connectionFactory); container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); - container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); - container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM)); container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM)); + container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED)); + container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index c0507df0..f1744d1e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -132,6 +132,11 @@ public class SendRtpItem { */ private String receiveStream; + /** + * 上级的点播类型 + */ + private String sessionName; + public String getIp() { return ip; } @@ -332,6 +337,14 @@ public class SendRtpItem { this.localIp = localIp; } + public String getSessionName() { + return sessionName; + } + + public void setSessionName(String sessionName) { + this.sessionName = sessionName; + } + @Override public String toString() { return "SendRtpItem{" + @@ -347,7 +360,7 @@ public class SendRtpItem { ", stream='" + stream + '\'' + ", tcp=" + tcp + ", tcpActive=" + tcpActive + - ", localIp=" + localIp + + ", localIp='" + localIp + '\'' + ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + @@ -360,6 +373,7 @@ public class SendRtpItem { ", rtcp=" + rtcp + ", playType=" + playType + ", receiveStream='" + receiveStream + '\'' + + ", sessionName='" + sessionName + '\'' + '}'; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 242e5ef6..af7db2e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -16,7 +16,6 @@ import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; @@ -77,9 +76,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private DynamicTask dynamicTask; - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - @Autowired private IPlayService playService; @@ -117,13 +113,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (parentPlatform != null) { Map param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader); - }); + redisCatchStorage.sendStartSendRtp(sendRtpItem); } else { JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); if (startSendRtpStreamResult != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index ff7427bc..69f81425 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -19,7 +19,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -98,8 +97,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IStreamPushService pushService; - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; @Override public void afterPropertiesSet() throws Exception { @@ -142,7 +139,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (platform != null) { RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); - redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); +// redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); } }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 35399227..f12f38ac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -122,7 +122,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private UserSetting userSetting; @Autowired - private ZLMMediaListManager mediaListManager; + private RedisPlatformPushStreamOnlineLister mediaListManager; @Autowired private SipConfig config; @@ -568,19 +568,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } else if (gbStream != null) { - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); - } SendRtpItem sendRtpItem = new SendRtpItem(); - sendRtpItem.setTcpActive(tcpActive); + if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { + sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); + } + + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } sendRtpItem.setTcp(mediaTransmissionTCP); sendRtpItem.setRtcp(platform.isRtcp()); - sendRtpItem.setSsrc(ssrc); sendRtpItem.setPlatformName(platform.getName()); sendRtpItem.setPlatformId(platform.getServerGBId()); sendRtpItem.setMediaServerId(mediaServerItem.getId()); @@ -593,37 +590,48 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setOnlyAudio(false); - sendRtpItem.setPlayType(InviteStreamType.PUSH); sendRtpItem.setStatus(0); + sendRtpItem.setSessionName(sessionName); if ("push".equals(gbStream.getStreamType())) { if (streamPushItem != null) { // 从redis查询是否正在接收这个推流 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - sendRtpItem.setServerId(pushListItem.getSeverId()); if (pushListItem != null) { + sendRtpItem.setServerId(pushListItem.getSeverId()); StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); // 推流状态 - pushStream(sendRtpItem, mediaServerItem, platform, request); + sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { - // 未推流 拉起 + if (!platform.isStartOfflinePush()) { + // 平台设置中关闭了拉起离线的推流则直接回复 + try { + logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); + } + return; + } notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { + if (sendRtpItem.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } if (proxyByAppAndStream.isStatus()) { - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); } else { //开启代理拉流 notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } - - } } } @@ -649,33 +657,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements /** * 安排推流 */ - private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 自平台内容 - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("服务器端口资源不足"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + } + return; } - return; - } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); + sendRtpItem.setPlayType(InviteStreamType.PROXY); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); @@ -683,12 +681,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setToTag(response.getToTag()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } - } - private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // 推流 if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -712,6 +708,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (response != null) { sendRtpItem.setToTag(response.getToTag()); } + if (sendRtpItem.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { @@ -719,10 +720,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } else { - SendRtpItem sendRtpItem = new SendRtpItem(); - sendRtpItem.setRtcp(platform.isRtcp()); - sendRtpItem.setTcp(mediaTransmissionTCP); - sendRtpItem.setTcpActive(); // 其他平台内容 otherWvpPushStream(sendRtpItem, request, platform); } @@ -733,29 +730,28 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements */ private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // TODO 控制启用以使设备上线 - logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); // 监听流上线 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); - dynamicTask.stop(callIdHeader.getCallId()); - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + dynamicTask.stop(sendRtpItem.getCallId()); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); }); - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream()); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); if (!start) { try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); + responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); } zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - dynamicTask.stop(callIdHeader.getCallId()); + dynamicTask.stop(sendRtpItem.getCallId()); } } @@ -763,50 +759,28 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements * 通知流上线 */ private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { - if (!platform.isStartOfflinePush()) { - // 平台设置中关闭了拉起离线的推流则直接回复 - try { - logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); - } - return; - } - // 发送redis消息以使设备上线 - logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); - + // 发送redis消息以使设备上线,流上线后被 + logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), - platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId()); + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), + platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 设置超时 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); try { - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); + mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 超时 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } }, userSetting.getPlatformPlayTimeout()); - // 写入redis待发流信息,供其他wvp读取并生成发流信息 - SendRtpItem sendRtpItemTemp = new SendRtpItem(); - sendRtpItemTemp.setIp(addressStr); - sendRtpItemTemp.setPort(port); - sendRtpItemTemp.setSsrc(ssrc); - sendRtpItemTemp.setPlatformId(requesterId); - sendRtpItemTemp.setPlatformName(platform.getName()); - sendRtpItemTemp.setTcp(mediaTransmissionTCP); - sendRtpItemTemp.setRtcp(platform.isRtcp()); - sendRtpItemTemp.setTcpActive(tcpActive); - sendRtpItemTemp.setPlayType(InviteStreamType.PUSH); - redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout()); + redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout()); // 添加上线的通知 - mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> { - dynamicTask.stop(callIdHeader.getCallId()); - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); + mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> { + dynamicTask.stop(sendRtpItem.getCallId()); + redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -823,18 +797,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - sendRtpItemTemp.setLocalPort(localPort); - sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): ); - // 写入redis, 超时时回复 - sendRtpItemTemp.setStatus(1); - sendRtpItemTemp.setCallId(callIdHeader.getCallId()); - - sendRtpItemTemp.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform); - if (response != null) { - sendRtpItemTemp.setToTag(response.getToTag()); + sendRtpItem.setLocalPort(localPort); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sendRtpItem.setLocalIp(platform.getSendStreamIp()); } - redisCatchStorage.updateSendRTPSever(sendRtpItemTemp); + + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { // 其他平台内容 otherWvpPushStream(sendRtpItemFromRedis, request, platform); @@ -842,10 +816,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }); // 添加回复的拒绝或者错误的通知 - redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { + redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { - dynamicTask.stop(callIdHeader.getCallId()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + dynamicTask.stop(sendRtpItem.getCallId()); + mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 65685aa1..346b4a2f 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -18,14 +18,12 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.media.zlm.dto.HookType; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -103,7 +101,7 @@ public class ZLMHttpHookListener { private EventPublisher eventPublisher; @Autowired - private ZLMMediaListManager zlmMediaListManager; + private RedisPlatformPushStreamOnlineLister zlmMediaListManager; @Autowired private ZlmHttpHookSubscribe subscribe; @@ -130,6 +128,9 @@ public class ZLMHttpHookListener { @Autowired private RedisTemplate redisTemplate; + @Autowired + private IStreamPushService streamPushService; + /** * 服务器定时上报时间,上报间隔可配置,默认10s上报一次 */ @@ -236,11 +237,8 @@ public class ZLMHttpHookListener { // 鉴权通过 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); } - } else { - zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId()); } - HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); result.setEnable_audio(true); taskExecutor.execute(() -> { @@ -465,8 +463,7 @@ public class ZLMHttpHookListener { || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { param.setSeverId(userSetting.getServerId()); - zlmMediaListManager.addPush(param); - + streamPushService.updatePush(param); // 冗余数据,自己系统中自用 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param); } @@ -483,10 +480,13 @@ public class ZLMHttpHookListener { } } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { -// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + // 查找是否关联了国标, 关联了不删除, 置为离线 + if (gbStream == null) { + storager.removeMedia(param.getApp(), param.getStream()); + }else { +// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + storager.mediaOffline(param.getApp(), param.getStream()); } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java deleted file mode 100755 index cbc5fde6..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ /dev/null @@ -1,138 +0,0 @@ -package com.genersoft.iot.vmp.media.zlm; - -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.media.zlm.dto.*; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; -import com.genersoft.iot.vmp.utils.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -/** - * @author lin - */ -@Component -public class ZLMMediaListManager { - - private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager"); - - @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private GbStreamMapper gbStreamMapper; - - @Autowired - private PlatformGbStreamMapper platformGbStreamMapper; - - @Autowired - private IStreamPushService streamPushService; - - @Autowired - private IStreamProxyService streamProxyService; - - @Autowired - private StreamPushMapper streamPushMapper; - - @Autowired - private ZlmHttpHookSubscribe subscribe; - - @Autowired - private UserSetting userSetting; - - @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private IMediaServerService mediaServerService; - - private Map channelOnPublishEvents = new ConcurrentHashMap<>(); - - public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) { - StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam); - StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); - transform.setPushIng(onStreamChangedHookParam.isRegist()); - transform.setUpdateTime(DateUtil.getNow()); - transform.setPushTime(DateUtil.getNow()); - transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId())); - if (pushInDb == null) { - transform.setCreateTime(DateUtil.getNow()); - streamPushMapper.add(transform); - }else { - streamPushMapper.update(transform); - gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId()); - } - ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); - if ( channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; - } catch (ParseException e) { - logger.error("addPush: ", e); - } - removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); - } - return transform; - } - - public void sendStreamEvent(String app, String stream, String mediaServerId) { - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); - // 查看推流状态 - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); - if (streamReady != null && streamReady) { - ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); - if (channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(app, stream, mediaServerId); - } catch (ParseException e) { - logger.error("sendStreamEvent: ", e); - } - removedChannelOnlineEventLister(app, stream); - } - } - } - - public int removeMedia(String app, String streamId) { - // 查找是否关联了国标, 关联了不删除, 置为离线 - GbStream gbStream = gbStreamMapper.selectOne(app, streamId); - int result; - if (gbStream == null) { - result = storager.removeMedia(app, streamId); - }else { - result =storager.mediaOffline(app, streamId); - } - return result; - } - - public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { - this.channelOnPublishEvents.put(app + "_" + stream, callback); - } - - public void removedChannelOnlineEventLister(String app, String stream) { - this.channelOnPublishEvents.remove(app + "_" + stream); - } - - public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { - return this.channelOnPublishEvents.get(app + "_" + stream); - } - -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 10b1eff1..32a42b10 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -118,4 +118,5 @@ public interface IStreamPushService { Map getAllAppAndStreamMap(); + void updatePush(OnStreamChangedHookParam param); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 8c0e9b04..bc052767 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -31,7 +31,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; @@ -134,9 +133,6 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ThreadPoolTaskExecutor taskExecutor; - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - @Autowired private ZlmHttpHookSubscribe hookSubscribe; @@ -1366,15 +1362,7 @@ public class PlayServiceImpl implements IPlayService { param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0"); } - if (mediaInfo == null) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader); - }); - } else { + if (mediaInfo != null) { // 如果是严格模式,需要关闭端口占用 JSONObject startSendRtpStreamResult = null; if (sendRtpItem.getLocalPort() != 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index e2d7e68c..bb51edc1 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -553,4 +553,21 @@ public class StreamPushServiceImpl implements IStreamPushService { public Map getAllAppAndStreamMap() { return streamPushMapper.getAllAppAndStreamMap(); } + + @Override + public void updatePush(OnStreamChangedHookParam param) { + StreamPushItem transform = transform(param); + StreamPushItem pushInDb = getPush(param.getApp(), param.getStream()); + transform.setPushIng(param.isRegist()); + transform.setUpdateTime(DateUtil.getNow()); + transform.setPushTime(DateUtil.getNow()); + transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); + if (pushInDb == null) { + transform.setCreateTime(DateUtil.getNow()); + streamPushMapper.add(transform); + }else { + streamPushMapper.update(transform); + gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java new file mode 100755 index 00000000..8ad08071 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java @@ -0,0 +1,97 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamProxyService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.text.ParseException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * @author lin + */ +@Component +public class RedisPlatformPushStreamOnlineLister implements MessageListener { + + private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister"); + + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + /** + * 通过redis消息接收流上线的通知,如果本机由对这个流的监听,则回调 + */ + @Override + public void onMessage(Message message, byte[] pattern) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); + sendStreamEvent(sendRtpItem); + } + }); + } + } + + private final Map channelOnPublishEvents = new ConcurrentHashMap<>(); + + public void sendStreamEvent(SendRtpItem sendRtpItem) { + // 查看推流状态 + ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (channelOnlineEventLister != null) { + try { + channelOnlineEventLister.run(sendRtpItem); + } catch (ParseException e) { + logger.error("sendStreamEvent: ", e); + } + removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); + } + } + + public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { + this.channelOnPublishEvents.put(app + "_" + stream, callback); + } + + public void removedChannelOnlineEventLister(String app, String stream) { + this.channelOnPublishEvents.remove(app + "_" + stream); + } + + public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { + return this.channelOnPublishEvents.get(app + "_" + stream); + } + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java index 14a96e83..25dd334c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java @@ -1,12 +1,16 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +22,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -32,10 +38,10 @@ public class RedisPlatformStartSendRtpListener implements MessageListener { private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Autowired - private UserSetting userSetting; + private ZLMServerFactory zlmServerFactory; @Autowired - private ZlmHttpHookSubscribe hookSubscribe; + private IMediaServerService mediaServerService; @Qualifier("taskExecutor") @Autowired @@ -52,23 +58,14 @@ public class RedisPlatformStartSendRtpListener implements MessageListener { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { - MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); - if (messageForPushChannel == null - || ObjectUtils.isEmpty(messageForPushChannel.getApp()) - || ObjectUtils.isEmpty(messageForPushChannel.getStream()) - || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ - continue; + SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); + sendRtpItem.getMediaServerId(); + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServer == null) { + return; } - - // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 - HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( - messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", - null); - hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { - // 读取redis中的上级点播信息,生成sendRtpItm发送出去 - - }); - + Map sendRtpParam = getSendRtpParam(sendRtpItem); + sendRtp(sendRtpItem, mediaServer, sendRtpParam); }catch (Exception e) { logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); @@ -78,4 +75,48 @@ public class RedisPlatformStartSendRtpListener implements MessageListener { }); } } + + private Map getSendRtpParam(SendRtpItem sendRtpItem) { + String isUdp = sendRtpItem.isTcp() ? "0" : "1"; + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", isUdp); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + return param; + } + + private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map param){ + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); + } + } + return startSendRtpStreamResult; + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java index f3b415d7..25600a23 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java @@ -2,12 +2,15 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -34,14 +37,26 @@ public class RedisPlatformWaitPushStreamOnlineListener implements MessageListene @Autowired private UserSetting userSetting; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Autowired private ZlmHttpHookSubscribe hookSubscribe; + @Autowired + private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; + + @Autowired + private SSRCFactory ssrcFactory; + @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + /** + * 当上级点播时,这里负责监听等到流上线,流上线后如果是在当前服务则直接回调,如果是其他wvp,则由redis消息进行通知 + */ @Override public void onMessage(Message message, byte[] bytes) { logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody())); @@ -66,7 +81,17 @@ public class RedisPlatformWaitPushStreamOnlineListener implements MessageListene null); hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { // 读取redis中的上级点播信息,生成sendRtpItm发送出去 - + SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream()); + if (sendRtpItem.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); + sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); + redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem); + // 通知其他wvp, 由RedisPlatformPushStreamOnlineLister接收此监听。 + redisCatchStorage.sendPushStreamOnline(sendRtpItem); + } }); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java deleted file mode 100755 index 7d5ba609..00000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.concurrent.ConcurrentLinkedQueue; - - -/** - * 接收其他wvp发送流变化通知 - * @author lin - */ -@Component -public class RedisStreamMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); - - @Autowired - private UserSetting userSetting; - - @Autowired - private ZLMMediaListManager zlmMediaListManager; - - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - @Override - public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); - if (steamMsgJson == null) { - logger.warn("[收到redis 流变化]消息解析失败"); - continue; - } - String serverId = steamMsgJson.getString("serverId"); - - if (userSetting.getServerId().equals(serverId)) { - // 自己发送的消息忽略即可 - continue; - } - logger.info("[收到redis 流变化]: {}", new String(message.getBody())); - String app = steamMsgJson.getString("app"); - String stream = steamMsgJson.getString("stream"); - boolean register = steamMsgJson.getBoolean("register"); - String mediaServerId = steamMsgJson.getString("mediaServerId"); - OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); - onStreamChangedHookParam.setSeverId(serverId); - onStreamChangedHookParam.setApp(app); - onStreamChangedHookParam.setStream(stream); - onStreamChangedHookParam.setRegist(register); - onStreamChangedHookParam.setMediaServerId(mediaServerId); - onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); - onStreamChangedHookParam.setAliveSecond(0L); - onStreamChangedHookParam.setTotalReaderCount("0"); - onStreamChangedHookParam.setOriginType(0); - onStreamChangedHookParam.setOriginTypeStr("0"); - onStreamChangedHookParam.setOriginTypeStr("unknown"); - ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream); - if ( channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(app, stream, serverId);; - } catch (ParseException e) { - logger.error("addPush: ", e); - } - zlmMediaListManager.removedChannelOnlineEventLister(app, stream); - } - }catch (Exception e) { - logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS消息-流变化] 异常内容: ", e); - } - } - }); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 1e5e93dd..108bc178 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -219,5 +219,9 @@ public interface IRedisCatchStorage { void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout); + SendRtpItem getWaiteSendRtpItem(String app, String stream); + void sendStartSendRtp(SendRtpItem sendRtpItem); + + void sendPushStreamOnline(SendRtpItem sendRtpItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 60084df1..70888377 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -685,9 +685,22 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { redisTemplate.opsForValue().set(key, platformPlayTimeout); } + @Override + public SendRtpItem getWaiteSendRtpItem(String app, String stream) { + String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream; + return (SendRtpItem)redisTemplate.opsForValue().get(key); + } + @Override public void sendStartSendRtp(SendRtpItem sendRtpItem) { String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem)); } + + @Override + public void sendPushStreamOnline(SendRtpItem sendRtpItem) { + String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED; + logger.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); + } }