From a71063dd1fc25d99486b36ba65c3081a3c8c7c01 Mon Sep 17 00:00:00 2001 From: lawrencehj <1934378145@qq.com> Date: Sun, 14 Mar 2021 21:20:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=8A=E7=BA=A7=E7=82=B9?= =?UTF-8?q?=E6=92=AD=E5=81=9C=E6=AD=A2=E5=90=8E=E9=80=9A=E7=9F=A5=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E5=81=9C=E6=AD=A2=E6=8E=A8=E6=B5=81=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E8=87=AA=E5=8A=A8=E4=B8=8E=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E6=92=AD=E6=94=BE=E5=8D=8F=E5=90=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/transmit/SIPProcessorFactory.java | 1 + .../request/impl/ByeRequestProcessor.java | 37 ++++++++++++++----- .../vmp/media/zlm/ZLMHttpHookListener.java | 25 ++++++++----- .../vmp/media/zlm/ZLMRTPServerFactory.java | 10 +++++ .../iot/vmp/storager/IRedisCatchStorage.java | 13 +++++++ .../storager/impl/RedisCatchStorageImpl.java | 26 +++++++++++++ 6 files changed, 93 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java index b3b2fbaf..de9d8377 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java @@ -156,6 +156,7 @@ public class SIPProcessorFactory { processor.setRequestEvent(evt); processor.setRedisCatchStorage(redisCatchStorage); processor.setZlmrtpServerFactory(zlmrtpServerFactory); + processor.setSIPCommander(cmder); return processor; } else if (Request.CANCEL.equals(method)) { CancelRequestProcessor processor = new CancelRequestProcessor(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java index a14a4cc6..c9ea567a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java @@ -1,29 +1,38 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; +import javax.sip.address.SipURI; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; import javax.sip.message.Response; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.apache.log4j.Logger; + import java.text.ParseException; import java.util.HashMap; import java.util.Map; /** * @Description: BYE请求处理器 - * @author: swwheihei - * @date: 2020年5月3日 下午5:32:05 + * @author: lawrencehj + * @date: 2021年3月9日 */ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { - private IRedisCatchStorage redisCatchStorage; + private ISIPCommander cmder; + + private IRedisCatchStorage redisCatchStorage; private ZLMRTPServerFactory zlmrtpServerFactory; @@ -38,10 +47,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState().equals(DialogState.TERMINATED)) { - String remoteUri = dialog.getRemoteParty().getURI().toString(); - String localUri = dialog.getLocalParty().getURI().toString(); - String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); - String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); String streamId = sendRtpItem.getStreamId(); Map param = new HashMap<>(); @@ -50,6 +57,11 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { param.put("stream",streamId); System.out.println("停止向上级推流:" + streamId); zlmrtpServerFactory.stopSendRtpStream(param); + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); + if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) { + System.out.println(streamId + "无其它观看者,通知设备停止推流"); + cmder.streamByeCmd(streamId); + } } } catch (SipException e) { e.printStackTrace(); @@ -58,8 +70,6 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { } catch (ParseException e) { e.printStackTrace(); } - // TODO 优先级99 Bye Request消息实现,此消息一般为级联消息,上级给下级发送视频停止指令 - } /*** @@ -89,4 +99,13 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { this.zlmrtpServerFactory = zlmrtpServerFactory; } + + public ISIPCommander getSIPCommander() { + return cmder; + } + + public void setSIPCommander(ISIPCommander cmder) { + this.cmder = cmder; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 90b53695..51f61eff 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -267,20 +267,25 @@ public class ZLMHttpHookListener { } String streamId = json.getString("stream"); - - cmder.streamByeCmd(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); - if (streamInfo!=null){ - redisCatchStorage.stopPlay(streamInfo); - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); - }else{ - streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); - redisCatchStorage.stopPlayback(streamInfo); - } - + JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("close", true); + + if (streamInfo != null) { + if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) { + ret.put("close", false); + } else { + cmder.streamByeCmd(streamId); + redisCatchStorage.stopPlay(streamInfo); + storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + } + }else{ + cmder.streamByeCmd(streamId); + streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + redisCatchStorage.stopPlayback(streamInfo); + } return new ResponseEntity(ret.toString(),HttpStatus.OK); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 00951ba2..1f1693df 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -152,6 +152,16 @@ public class ZLMRTPServerFactory { return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); } + /** + * 查询转推的流是否有其它观看者 + * @param streamId + * @return + */ + public int totalReaderCount(String streamId) { + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId); + return mediaInfo.getInteger("totalReaderCount"); + } + /** * 调用zlm RESTful API —— stopSendRtp */ diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index ca70620f..90611846 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -89,4 +89,17 @@ public interface IRedisCatchStorage { */ SendRtpItem querySendRTPServer(String platformGbId, String channelId); + /** + * 删除RTP推送信息缓存 + * @param platformGbId + * @param channelId + */ + void deleteSendRTPServer(String platformGbId, String channelId); + + /** + * 查询某个通道是否存在上级点播(RTP推送) + * @param channelId + */ + boolean isChannelSendingRTP(String channelId); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 6153e5f7..3feb347e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -225,4 +225,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return (SendRtpItem)redis.get(key); } + /** + * 删除RTP推送信息缓存 + * @param platformGbId + * @param channelId + */ + @Override + public void deleteSendRTPServer(String platformGbId, String channelId) { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId; + redis.del(key); + } + + /** + * 查询某个通道是否存在上级点播(RTP推送) + * @param channelId + */ + @Override + public boolean isChannelSendingRTP(String channelId) { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId; + List RtpStreams = redis.scan(key); + if (RtpStreams.size() > 0) { + return true; + } else { + return false; + } + } + }