From dc841a9dcf3bbfb251114e15887e93c28271d221 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 12 Dec 2023 17:57:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/GB28181ResourceServiceImpl.java | 20 +- .../iot/vmp/gb28181/bean/Gb28181Sdp.java | 140 +++- .../iot/vmp/gb28181/event/EventPublisher.java | 7 - .../cmd/ISIPCommanderForPlatform.java | 2 +- .../cmd/impl/SIPCommanderFroPlatform.java | 270 +++---- .../request/impl/ByeRequestProcessor.java | 2 +- .../request/impl/InviteRequestProcessor.java | 679 ++++++++---------- .../NotifyRequestForCatalogProcessor.java | 2 - .../query/cmd/CatalogQueryMessageHandler.java | 3 +- .../iot/vmp/gb28181/utils/SipUtils.java | 18 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 10 - .../iot/vmp/media/zlm/ZLMServerFactory.java | 3 +- .../media/zlm/service/IReceiveRtpService.java | 8 + .../media/zlm/service/ISendRtpService.java | 7 + .../zlm/service/impl/SendRtpServiceImpl.java | 50 ++ .../vmp/service/ICommonGbChannelService.java | 2 + .../vmp/service/IPlatformChannelService.java | 5 + .../vmp/service/IResourcePlayCallback.java | 7 +- .../iot/vmp/service/IResourceService.java | 10 + .../impl/CommonGbChannelServiceImpl.java | 5 + .../impl/DeviceChannelServiceImpl.java | 6 + .../impl/PlatformChannelServiceImpl.java | 5 +- .../service/impl/StreamPushServiceImpl.java | 32 +- .../vmp/storager/dao/StreamPushMapper.java | 10 +- 24 files changed, 714 insertions(+), 589 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/service/IReceiveRtpService.java create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/service/ISendRtpService.java create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/service/impl/SendRtpServiceImpl.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java index 136f85df..540d8fed 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java @@ -57,20 +57,20 @@ public class GB28181ResourceServiceImpl implements IResourceService { assert callback != null; if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) { logger.warn("[资源类-国标28181] 收到播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); - callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + callback.call(commonGbChannel, null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); return; } DeviceChannel channel = deviceChannelMapper.getChannelByCommonChannelId(commonGbChannel.getCommonGbId()); if (channel == null) { logger.warn("[资源类-国标28181] 收到播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId()); - callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + callback.call(commonGbChannel, null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); return; } Device device = deviceMapper.getDeviceByDeviceId(channel.getDeviceId()); if (device == null) { logger.warn("[资源类-国标28181] 收到播放通道: {} 时未找到通道 {} 所属的国标设备", commonGbChannel.getCommonGbId(), channel.getDeviceId()); - callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + callback.call(commonGbChannel, null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); return; } MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); @@ -78,10 +78,10 @@ public class GB28181ResourceServiceImpl implements IResourceService { if (code == InviteErrorCode.SUCCESS.getCode()) { if (data != null) { StreamInfo streamInfo = (StreamInfo)data; - callback.call(commonGbChannel, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + callback.call(commonGbChannel, mediaServerItem, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } }else { - callback.call(commonGbChannel, code, msg, null); + callback.call(commonGbChannel, null, code, msg, null); } }); } @@ -91,7 +91,7 @@ public class GB28181ResourceServiceImpl implements IResourceService { if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) { logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); if (callback != null) { - callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + callback.call(commonGbChannel,null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); } return; } @@ -99,7 +99,7 @@ public class GB28181ResourceServiceImpl implements IResourceService { if (channel == null) { logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId()); if (callback != null) { - callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + callback.call(commonGbChannel, null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); } return; } @@ -107,7 +107,7 @@ public class GB28181ResourceServiceImpl implements IResourceService { playService.stop(channel.getDeviceId(), channel.getChannelId()); } catch (ControllerException exception) { if (callback != null) { - callback.call(commonGbChannel, exception.getCode(), exception.getMsg(), null); + callback.call(commonGbChannel, null,exception.getCode(), exception.getMsg(), null); } } } @@ -119,4 +119,8 @@ public class GB28181ResourceServiceImpl implements IResourceService { return false; } + @Override + public void streamOffline(String app, String streamId) { + // TODO + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java index 4b9e26a0..c336462f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java @@ -1,6 +1,14 @@ package com.genersoft.iot.vmp.gb28181.bean; +import gov.nist.javax.sdp.TimeDescriptionImpl; +import gov.nist.javax.sdp.fields.TimeField; + +import javax.sdp.Media; +import javax.sdp.MediaDescription; +import javax.sdp.SdpException; import javax.sdp.SessionDescription; +import java.time.Instant; +import java.util.Vector; /** * 28181 的SDP解析器 @@ -11,11 +19,67 @@ public class Gb28181Sdp { private String mediaDescription; - public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescription) { + private Long startTime = null; + private Long stopTime = null; + + private boolean tcp; + private boolean tcpActive; + + private String sdpIp; + + private Integer sdpPort; + + private String username; + private String addressStr; + + private Integer downloadSpeed; + + public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescriptionStr) throws SdpException { Gb28181Sdp gb28181Sdp = new Gb28181Sdp(); gb28181Sdp.setBaseSdb(baseSdb); gb28181Sdp.setSsrc(ssrc); - gb28181Sdp.setMediaDescription(mediaDescription); + gb28181Sdp.setMediaDescription(mediaDescriptionStr); + + if (baseSdb.getTimeDescriptions(false) != null && baseSdb.getTimeDescriptions(false).size() > 0) { + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (baseSdb.getTimeDescriptions(false).get(0)); + TimeField startTimeFiled = (TimeField) timeDescription.getTime(); + Long startTime = startTimeFiled.getStartTime(); + Long stopTime = startTimeFiled.getStopTime(); + gb28181Sdp.setStartTime(startTime); + gb28181Sdp.setStopTime(stopTime); + } + // 获取支持的格式 + Vector mediaDescriptions = baseSdb.getMediaDescriptions(true); + + for (Object description : mediaDescriptions) { + MediaDescription mediaDescription = (MediaDescription) description; + gb28181Sdp.setDownloadSpeed(Integer.parseInt(mediaDescription.getAttribute("downloadspeed"))); + + Media media = mediaDescription.getMedia(); + Vector mediaFormats = media.getMediaFormats(false); + // 查看是否支持PS 负载96 + if (mediaFormats.contains("96")) { + gb28181Sdp.setSdpPort(media.getMediaPort()); + String protocol = media.getProtocol(); + + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + gb28181Sdp.setTcp(true); + if ("active".equalsIgnoreCase(setup)) { + gb28181Sdp.setTcpActive(true); + } else if ("passive".equalsIgnoreCase(setup)) { + gb28181Sdp.setTcpActive(false); + } + } + } + break; + } + } + + gb28181Sdp.setUsername(baseSdb.getOrigin().getUsername()); + gb28181Sdp.setAddressStr(baseSdb.getConnection().getAddress()); return gb28181Sdp; } @@ -43,4 +107,76 @@ public class Gb28181Sdp { public void setMediaDescription(String mediaDescription) { this.mediaDescription = mediaDescription; } + + public Long getStartTime() { + return startTime; + } + + public void setStartTime(Long startTime) { + this.startTime = startTime; + } + + public Long getStopTime() { + return stopTime; + } + + public void setStopTime(Long stopTime) { + this.stopTime = stopTime; + } + + public boolean isTcp() { + return tcp; + } + + public void setTcp(boolean tcp) { + this.tcp = tcp; + } + + public boolean isTcpActive() { + return tcpActive; + } + + public void setTcpActive(boolean tcpActive) { + this.tcpActive = tcpActive; + } + + public String getSdpIp() { + return sdpIp; + } + + public void setSdpIp(String sdpIp) { + this.sdpIp = sdpIp; + } + + public Integer getSdpPort() { + return sdpPort; + } + + public void setSdpPort(Integer sdpPort) { + this.sdpPort = sdpPort; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getAddressStr() { + return addressStr; + } + + public void setAddressStr(String addressStr) { + this.addressStr = addressStr; + } + + public Integer getDownloadSpeed() { + return downloadSpeed; + } + + public void setDownloadSpeed(Integer downloadSpeed) { + this.downloadSpeed = downloadSpeed; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 0cdd08e3..902c2339 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -90,13 +90,6 @@ public class EventPublisher { applicationEventPublisher.publishEvent(outEvent); } - - public void catalogEventPublishForStream(Integer platformId, GbStream gbStream, String type) { - List gbStreamList = new ArrayList<>(); - gbStreamList.add(gbStream); - catalogEventPublishForStream(platformId, gbStreamList, type); - } - public void recordEndEventPush(RecordInfo recordInfo) { RecordEndEvent outEvent = new RecordEndEvent(this); outEvent.setRecordInfo(recordInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 544e7deb..17e6a28a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -51,7 +51,7 @@ public interface ISIPCommanderForPlatform { * @return */ void catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) throws SipException, InvalidArgumentException, ParseException; - void catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException; + void catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException; /** * 向上级回复DeviceInfo查询信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 933153f2..019789dd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -185,15 +185,15 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public void catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException { + public void catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException { if ( parentPlatform ==null) { return ; } sendCatalogResponse(channels, parentPlatform, sn, fromTag, 0, true); } - private String getCatalogXml(List channels, String sn, ParentPlatform parentPlatform, int size) { + private String getCatalogXml(List channels, String sn, ParentPlatform parentPlatform, int size) { String characterSet = parentPlatform.getCharacterSet(); - StringBuffer catalogXml = new StringBuffer(600); + StringBuilder catalogXml = new StringBuilder(600); catalogXml.append("\r\n") .append("\r\n") .append("Catalog\r\n") @@ -201,161 +201,108 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { .append("" + parentPlatform.getDeviceGBId() + "\r\n") .append("" + size + "\r\n") .append("\r\n"); - if (channels.size() > 0) { - for (DeviceChannel channel : channels) { - if (parentPlatform.getServerGBId().equals(channel.getParentId())) { - channel.setParentId(parentPlatform.getDeviceGBId()); - } + if (!channels.isEmpty()) { + for (CommonGbChannel channel : channels) { catalogXml.append("\r\n"); - // 行政区划分组只需要这两项就可以 - catalogXml.append("" + channel.getChannelId() + "\r\n"); - catalogXml.append("" + channel.getName() + "\r\n"); - if (channel.getChannelId().length() <= 8) { - catalogXml.append("\r\n"); - continue; - }else { - if (channel.getChannelId().length() != 20) { - catalogXml.append("\r\n"); - logger.warn("[编号长度异常] {} 长度错误,请使用20位长度的国标编号,当前长度:{}", channel.getChannelId(), channel.getChannelId().length()); - catalogXml.append("\r\n"); - continue; - } - switch (Integer.parseInt(channel.getChannelId().substring(10, 13))){ - case 200: -// catalogXml.append("三永华通\r\n"); -// GitUtil gitUtil = SpringBeanFactory.getBean("gitUtil"); -// String model = (gitUtil == null || gitUtil.getBuildVersion() == null)?"1.0": gitUtil.getBuildVersion(); -// catalogXml.append("" + model + "\r\n"); -// catalogXml.append("三永华通\r\n"); - if (channel.getCivilCode() != null) { - catalogXml.append(""+channel.getCivilCode()+"\r\n"); - }else { - catalogXml.append("\r\n"); - } - - catalogXml.append("1\r\n"); - catalogXml.append("0\r\n"); - break; - case 215: - if (!ObjectUtils.isEmpty(channel.getParentId())) { - catalogXml.append("" + channel.getParentId() + "\r\n"); - } - - break; - case 216: - if (!ObjectUtils.isEmpty(channel.getParentId())) { - catalogXml.append("" + channel.getParentId() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getBusinessGroupId())) { - catalogXml.append("" + channel.getBusinessGroupId() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - break; - default: - // 通道项 - if (channel.getManufacture() != null) { - catalogXml.append("" + channel.getManufacture() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (channel.getSecrecy() != null) { - catalogXml.append("" + channel.getSecrecy() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - catalogXml.append("" + channel.getRegisterWay() + "\r\n"); - if (channel.getModel() != null) { - catalogXml.append("" + channel.getModel() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (channel.getOwner() != null) { - catalogXml.append("" + channel.getOwner()+ "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (channel.getCivilCode() != null) { - catalogXml.append("" + channel.getCivilCode() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (channel.getAddress() == null) { - catalogXml.append("
\r\n"); - }else { - catalogXml.append("
" + channel.getAddress() + "
\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getParentId())) { - catalogXml.append("" + channel.getParentId() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getBlock())) { - catalogXml.append("" + channel.getBlock() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getSafetyWay())) { - catalogXml.append("" + channel.getSafetyWay() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getCertNum())) { - catalogXml.append("" + channel.getCertNum() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getCertifiable())) { - catalogXml.append("" + channel.getCertifiable() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getErrCode())) { - catalogXml.append("" + channel.getErrCode() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getEndTime())) { - catalogXml.append("" + channel.getEndTime() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getSecrecy())) { - catalogXml.append("" + channel.getSecrecy() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getIpAddress())) { - catalogXml.append("" + channel.getIpAddress() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - catalogXml.append("" + channel.getPort() + "\r\n"); - if (!ObjectUtils.isEmpty(channel.getPassword())) { - catalogXml.append("" + channel.getPassword() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - if (!ObjectUtils.isEmpty(channel.getPTZType())) { - catalogXml.append("" + channel.getPTZType() + "\r\n"); - }else { - catalogXml.append("\r\n"); - } - catalogXml.append("" + (channel.isStatus() ?"ON":"OFF") + "\r\n"); - - catalogXml.append("" + - (channel.getLongitudeWgs84() != 0? channel.getLongitudeWgs84():channel.getLongitude()) - + "\r\n"); - catalogXml.append("" + - (channel.getLatitudeWgs84() != 0? channel.getLatitudeWgs84():channel.getLatitude()) - + "\r\n"); - break; - - } - catalogXml.append("\r\n"); + catalogXml.append("" + channel.getCommonGbDeviceID() + "\r\n"); + catalogXml.append("" + channel.getCommonGbName() + "\r\n"); + if (!ObjectUtils.isEmpty(channel.getCommonGbManufacturer())) { + catalogXml.append("" + channel.getCommonGbManufacturer() + "\r\n"); } + if (!ObjectUtils.isEmpty(channel.getCommonGbModel())) { + catalogXml.append("" + channel.getCommonGbModel() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbOwner())) { + catalogXml.append(" " + channel.getCommonGbOwner()+ "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbCivilCode())) { + catalogXml.append(" " + channel.getCommonGbCivilCode()+ "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbBlock())) { + catalogXml.append("" + channel.getCommonGbBlock() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbAddress())) { + catalogXml.append("
" + channel.getCommonGbAddress()+ "
\r\n"); + } + catalogXml.append("" + channel.getCommonGbParental() + "\r\n"); + if (!ObjectUtils.isEmpty(channel.getCommonGbParentID())) { + // 业务分组加上这一项即可,提高兼容性, + catalogXml.append("" + channel.getCommonGbParentID() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbSafetyWay())) { + catalogXml.append("" + channel.getCommonGbSafetyWay() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbRegisterWay())) { + catalogXml.append("" + channel.getCommonGbRegisterWay() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbCertNum())) { + catalogXml.append("" + channel.getCommonGbCertNum() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbCertifiable())) { + catalogXml.append("" + channel.getCommonGbCertifiable() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbErrCode())) { + catalogXml.append("" + channel.getCommonGbErrCode() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbEndTime())) { + catalogXml.append("" + channel.getCommonGbEndTime() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbSecrecy())) { + catalogXml.append("" + channel.getCommonGbSecrecy() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbIPAddress())) { + catalogXml.append("" + channel.getCommonGbIPAddress() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbPort())) { + catalogXml.append("" + channel.getCommonGbPort() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbPassword())) { + catalogXml.append("" + channel.getCommonGbPassword() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbStatus())) { + catalogXml.append("" + (channel.getCommonGbStatus() ? "ON" : "OFF") + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbLongitude())) { + catalogXml.append("" + channel.getCommonGbLongitude() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbLatitude())) { + catalogXml.append("" + channel.getCommonGbLatitude() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbPtzType())) { + catalogXml.append("" + channel.getCommonGbPtzType() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbPositionType())) { + catalogXml.append("" + channel.getCommonGbPositionType() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbRoomType())) { + catalogXml.append("" + channel.getCommonGbRoomType() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbUseType())) { + catalogXml.append("" + channel.getCommonGbUseType() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbSupplyLightType())) { + catalogXml.append("" + channel.getCommonGbSupplyLightType() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbDirectionType())) { + catalogXml.append("" + channel.getCommonGbDirectionType() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbResolution())) { + catalogXml.append("" + channel.getCommonGbResolution() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbBusinessGroupID())) { + catalogXml.append("" + channel.getCommonGbBusinessGroupID() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbDownloadSpeed())) { + catalogXml.append("" + channel.getCommonGbDownloadSpeed() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getSVCSpaceSupportMode())) { + catalogXml.append("" + channel.getSVCSpaceSupportMode() + "\r\n"); + } + if (!ObjectUtils.isEmpty(channel.getCommonGbSVCTimeSupportMode())) { + catalogXml.append("" + channel.getCommonGbSVCTimeSupportMode() + "\r\n"); + } + + catalogXml.append("\r\n"); } } @@ -364,17 +311,17 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { return catalogXml.toString(); } - private void sendCatalogResponse(List channels, ParentPlatform parentPlatform, String sn, String fromTag, int index, boolean sendAfterResponse) throws SipException, InvalidArgumentException, ParseException { + private void sendCatalogResponse(List channels, ParentPlatform parentPlatform, String sn, String fromTag, int index, boolean sendAfterResponse) throws SipException, InvalidArgumentException, ParseException { if (index >= channels.size()) { return; } - List deviceChannels; + List channelList; if (index + parentPlatform.getCatalogGroup() < channels.size()) { - deviceChannels = channels.subList(index, index + parentPlatform.getCatalogGroup()); + channelList = channels.subList(index, index + parentPlatform.getCatalogGroup()); }else { - deviceChannels = channels.subList(index, channels.size()); + channelList = channels.subList(index, channels.size()); } - String catalogXml = getCatalogXml(deviceChannels, sn, parentPlatform, channels.size()); + String catalogXml = getCatalogXml(channelList, sn, parentPlatform, channels.size()); // callid CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); @@ -616,7 +563,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { if (channels.size() > 0) { for (CommonGbChannel channel : channels) { catalogXml.append("\r\n"); - // 行政区划分组只需要这两项就可以 catalogXml.append("" + channel.getCommonGbDeviceID() + "\r\n"); catalogXml.append("" + channel.getCommonGbName() + "\r\n"); if (!ObjectUtils.isEmpty(channel.getCommonGbManufacturer())) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index b6aac9c7..ea7a9302 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -138,7 +138,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); } try { - logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + logger.info("[停止点播] {}", sendRtpItem.getChannelId()); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index bc17d59a..4e5ba645 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -68,6 +69,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IVideoManagerStorage storager; + @Autowired + private IPlatformChannelService platformChannelService; + @Autowired private IStreamPushService streamPushService; @Autowired @@ -76,9 +80,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IRedisCatchStorage redisCatchStorage; - @Autowired - private IInviteStreamService inviteStreamService; - @Autowired private SSRCFactory ssrcFactory; @@ -91,6 +92,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IPlayService playService; + @Autowired + private Map resourceServiceMap; + @Autowired private SIPSender sipSender; @@ -147,7 +151,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } - // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform == null) { @@ -155,83 +158,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } else { // 查询平台下是否有该通道 - DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); - GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); - PlatformCatalog catalog = storager.getCatalog(requesterId, channelId); - - MediaServerItem mediaServerItem = null; - StreamPush streamPushItem = null; - StreamProxy proxyByAppAndStream =null; - // 不是通道可能是直播流 - if (channel != null && gbStream == null) { - // 通道存在,发100,TRYING - try { - responseAck(request, Response.TRYING); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite TRYING: {}", e.getMessage()); - } - } else if (channel == null && gbStream != null) { - - String mediaServerId = gbStream.getMediaServerId(); - mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem == null) { - if ("proxy".equals(gbStream.getStreamType())) { - logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } else { - streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); - if (streamPushItem != null) { - mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); - } - if (mediaServerItem == null) { - mediaServerItem = mediaServerService.getDefaultMediaServer(); - } - } - } else { - if ("push".equals(gbStream.getStreamType())) { - streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); - if (streamPushItem == null) { - logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } - }else if("proxy".equals(gbStream.getStreamType())){ - proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); - if (proxyByAppAndStream == null) { - logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } - } - } - try { - responseAck(request, Response.CALL_IS_BEING_FORWARDED); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}", e.getMessage()); - } - } else if (catalog != null) { - try { - // 目录不支持点播 - responseAck(request, Response.BAD_REQUEST, "catalog channel can not play"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage()); - } - return; - } else { - logger.info("通道不存在,返回404: {}", channelId); + CommonGbChannel channel = platformChannelService.queryChannelByPlatformIdAndChannelDeviceId(platform.getId(), channelId); + if (channel == null) { + logger.info("[国标级联] 上级点播 通道不存在,返回404: {}", channelId); try { // 通道不存在,发404,资源不存在 responseAck(request, Response.NOT_FOUND); @@ -240,59 +169,30 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } + IResourceService resourceService = resourceServiceMap.get(channel.getType()); + if (resourceService == null) { + logger.info("[国标级联] 上级点播 未找到类型{}的处理类: {}", channel.getType(), channelId); + try { + // 通道不存在,发404,资源不存在 + responseAck(request, Response.NOT_IMPLEMENTED); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 通道不存在: {}", e.getMessage()); + } + return; + } + // 通道存在,发100,TRYING + try { + responseAck(request, Response.TRYING); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite TRYING: {}", e.getMessage()); + } + // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); - SessionDescription sdp = gb28181Sdp.getBaseSdb(); - String sessionName = sdp.getSessionName().getValue(); - Long startTime = null; - Long stopTime = null; - Instant start = null; - Instant end = null; - if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { - TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0)); - TimeField startTimeFiled = (TimeField) timeDescription.getTime(); - startTime = startTimeFiled.getStartTime(); - stopTime = startTimeFiled.getStopTime(); - - start = Instant.ofEpochSecond(startTime); - end = Instant.ofEpochSecond(stopTime); - } - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 查看是否支持PS 负载96 - //String ip = null; - int port = -1; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (Object description : mediaDescriptions) { - MediaDescription mediaDescription = (MediaDescription) description; - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("96")) { - port = media.getMediaPort(); - //String mediaType = media.getMediaType(); - String protocol = media.getProtocol(); - - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equalsIgnoreCase(setup)) { - tcpActive = true; - } else if ("passive".equalsIgnoreCase(setup)) { - tcpActive = false; - } - } - } - break; - } - } - if (port == -1) { + if (gb28181Sdp.getSdpPort() == 0) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 try { @@ -303,267 +203,316 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getConnection().getAddress(); - - - Device device = null; - // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 - if (channel != null) { - device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); - if (device == null) { - logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); - try { - responseAck(request, Response.SERVER_INTERNAL_ERROR); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage()); - } - return; - } - mediaServerItem = playService.getNewMediaServerItem(device); - if (mediaServerItem == null) { - logger.warn("未找到可用的zlm"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage()); - } - return; - } - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + String sessionName = gb28181Sdp.getBaseSdb().getSessionName().getValue(); + String streamTypeStr = "UDP"; + if (gb28181Sdp.isTcp()) { + if (gb28181Sdp.isTcpActive()) { + streamTypeStr = "TCP-ACTIVE"; }else { - ssrc = gb28181Sdp.getSsrc(); + streamTypeStr = "TCP-PASSIVE"; } - String streamTypeStr = null; - if (mediaTransmissionTCP) { - if (tcpActive) { - streamTypeStr = "TCP-ACTIVE"; + } + logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", + sessionName, gb28181Sdp.getUsername(), channelId, gb28181Sdp.getAddressStr(), + gb28181Sdp.getSdpPort(), streamTypeStr, gb28181Sdp.getSsrc()); + + IResourcePlayCallback callback = (commonGbChannel, mediaServerItem, code, message, streamInfo) -> { + if (code != 0) { + logger.info("[上级Invite] 获取资源流失败。{}, {}/{}", message, streamInfo.getApp(), streamInfo.getStream()); + try { + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + return; + } + try { + String ssrc; + logger.info("[上级Invite] 收到资源推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()); + if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? + ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : + ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); }else { - streamTypeStr = "TCP-PASSIVE"; + ssrc = gb28181Sdp.getSsrc(); } - }else { - streamTypeStr = "UDP"; - } - logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - if (sendRtpItem == null) { - logger.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, + gb28181Sdp.getAddressStr(), gb28181Sdp.getSdpPort(), ssrc, requesterId, + channelId, gb28181Sdp.isTcp(), platform.isRtcp()); + if (sendRtpItem == null) { + logger.warn("[上级Invite] 获取发流端口资源失败 服务器端口资源可能不足"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + } + return; + } + sendRtpItem.setTcpActive(gb28181Sdp.isTcpActive()); + sendRtpItem.setCallId(callIdHeader.getCallId()); + if ("Play".equalsIgnoreCase(sessionName)) { + sendRtpItem.setPlayType(InviteStreamType.PLAY); + }else if ("Playback".equalsIgnoreCase(sessionName)) { + sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); + }else if ("Download".equalsIgnoreCase(sessionName)) { + sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); } - return; - } - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); - - Long finalStartTime = startTime; - Long finalStopTime = stopTime; - ErrorCallback hookEvent = (code, msg, data) -> { - StreamInfo streamInfo = (StreamInfo)data; - MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()); - logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()); - // * 0 等待设备推流上来 - // * 1 下级已经推流,等待上级平台回复ack - // * 2 推流中 sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("s=" + sessionName + "\r\n"); - content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); - if ("Playback".equalsIgnoreCase(sessionName)) { - content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); + content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + if (!"Play".equalsIgnoreCase(sessionName)) { + content.append("t=" + gb28181Sdp.getStartTime() + " " + gb28181Sdp.getStopTime() + "\r\n"); } else { content.append("t=0 0\r\n"); } - int localPort = sendRtpItem.getLocalPort(); - if (localPort == 0) { - // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口 - localPort = new Random().nextInt(65535) + 1; - } - content.append("m=video " + localPort + " RTP/AVP 96\r\n"); + content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); content.append("f=\r\n"); - - try { - // 超时未收到Ack应该回复bye,当前等待时间为10秒 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("Ack 等待超时"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); - // 回复bye - try { - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - }, 60 * 1000); - responseSdpAck(request, content.toString(), platform); - // tcp主动模式,回复sdp后开启监听 - if (sendRtpItem.isTcpActive()) { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - if (!sendRtpItem.isTcpActive()) { - param.put("dst_url",sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - } - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - param.put("is_udp", is_Udp); - param.put("src_port", localPort); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - if (!sendRtpItem.isTcp()) { - // 开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } - JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); - if (startSendRtpStreamResult != null) { - startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); - } + // 超时未收到Ack应该回复bye,当前等待时间为10秒 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("Ack 等待超时"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + // 回复bye + try { + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 回复SdpAck", e); - } - }; - ErrorCallback errorEvent = ((statusCode, msg, data) -> { - // 未知错误。直接转发设备点播的错误 - try { - if (statusCode > 0) { - Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); - sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); + }, 60 * 1000); + responseSdpAck(request, content.toString(), platform); + if (sendRtpItem.isTcpActive()) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + if (!sendRtpItem.isTcpActive()) { + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); } - } catch (ParseException | SipException e) { - logger.error("未处理的异常 ", e); - } - }); - sendRtpItem.setApp("rtp"); - if ("Playback".equalsIgnoreCase(sessionName)) { - sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); - String startTimeStr = DateUtil.urlFormatter.format(start); - String endTimeStr = DateUtil.urlFormatter.format(end); - String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), - (code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()){ - hookEvent.run(code, msg, data); - }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){ - logger.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - }else { - errorEvent.run(code, msg, data); - } - }); - } else if ("Download".equalsIgnoreCase(sessionName)) { - // 获取指定的下载速度 - Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); - MediaDescription mediaDescription = null; - String downloadSpeed = "1"; - if (sdpMediaDescriptions.size() > 0) { - mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0); - } - if (mediaDescription != null) { - downloadSpeed = mediaDescription.getAttribute("downloadspeed"); - } - - sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed), - (code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { - hookEvent.run(code, msg, data); - } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { - logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - } else { - errorEvent.run(code, msg, data); - } - }); - } else { - - SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { - hookEvent.run(code, msg, data); - } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { - logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - } else { - errorEvent.run(code, msg, data); + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + param.put("is_udp", is_Udp); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // 开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); } - })); - sendRtpItem.setPlayType(InviteStreamType.PLAY); - String streamId = String.format("%s_%s", device.getDeviceId(), channelId); - sendRtpItem.setStreamId(streamId); - sendRtpItem.setSsrc(ssrcInfo.getSsrc()); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - } - } else if (gbStream != null) { - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); - } - - if("push".equals(gbStream.getStreamType())) { - if (streamPushItem != null && streamPushItem.isPushIng()) { - // 推流状态 - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 未推流 拉起 - notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - }else if ("proxy".equals(gbStream.getStreamType())){ - if (null != proxyByAppAndStream) { - if(proxyByAppAndStream.isStatus()){ - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - }else{ - //开启代理拉流 - notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - } - - + JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); + } + } + }catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 回复SdpAck", e); } + }; + if ("Play".equalsIgnoreCase(sessionName)) { + resourceService.startPlay(channel, callback); + }else if ("Playback".equalsIgnoreCase(sessionName)) { + resourceService.startPlayback(channel, gb28181Sdp.getStartTime(), gb28181Sdp.getStopTime(), callback); + }else if ("Download".equalsIgnoreCase(sessionName)) { + resourceService.startDownload(channel, gb28181Sdp.getStartTime(), gb28181Sdp.getStopTime(), + gb28181Sdp.getDownloadSpeed(), callback); } + + + + +// +// Device device = null; +// // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 +// if (channel != null) { +// +// ErrorCallback hookEvent = (code, msg, data) -> { +// StreamInfo streamInfo = (StreamInfo)data; +// MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()); +// logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()); +// // * 0 等待设备推流上来 +// // * 1 下级已经推流,等待上级平台回复ack +// // * 2 推流中 +// sendRtpItem.setStatus(1); +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// +// +// +// +// try { +// // 超时未收到Ack应该回复bye,当前等待时间为10秒 +// dynamicTask.startDelay(callIdHeader.getCallId(), () -> { +// logger.info("Ack 等待超时"); +// mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); +// // 回复bye +// try { +// cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); +// } catch (SipException | InvalidArgumentException | ParseException e) { +// logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); +// } +// }, 60 * 1000); +// responseSdpAck(request, content.toString(), platform); +// // tcp主动模式,回复sdp后开启监听 +// if (sendRtpItem.isTcpActive()) { +// MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); +// Map param = new HashMap<>(12); +// param.put("vhost","__defaultVhost__"); +// param.put("app",sendRtpItem.getApp()); +// param.put("stream",sendRtpItem.getStreamId()); +// param.put("ssrc", sendRtpItem.getSsrc()); +// if (!sendRtpItem.isTcpActive()) { +// param.put("dst_url",sendRtpItem.getIp()); +// param.put("dst_port", sendRtpItem.getPort()); +// } +// String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; +// param.put("is_udp", is_Udp); +// param.put("src_port", localPort); +// param.put("pt", sendRtpItem.getPt()); +// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); +// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); +// if (!sendRtpItem.isTcp()) { +// // 开启rtcp保活 +// param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); +// } +// JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); +// if (startSendRtpStreamResult != null) { +// startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); +// } +// } +// } catch (SipException | InvalidArgumentException | ParseException e) { +// logger.error("[命令发送失败] 国标级联 回复SdpAck", e); +// } +// }; +// ErrorCallback errorEvent = ((statusCode, msg, data) -> { +// // 未知错误。直接转发设备点播的错误 +// try { +// if (statusCode > 0) { +// Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); +// sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); +// } +// } catch (ParseException | SipException e) { +// logger.error("未处理的异常 ", e); +// } +// }); +// sendRtpItem.setApp("rtp"); +// if ("Playback".equalsIgnoreCase(sessionName)) { +// sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); +// String startTimeStr = DateUtil.urlFormatter.format(start); +// String endTimeStr = DateUtil.urlFormatter.format(end); +// String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; +// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); +// sendRtpItem.setStreamId(ssrcInfo.getStream()); +// // 写入redis, 超时时回复 +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), +// DateUtil.formatter.format(end), +// (code, msg, data) -> { +// if (code == InviteErrorCode.SUCCESS.getCode()){ +// hookEvent.run(code, msg, data); +// }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){ +// logger.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId); +// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); +// errorEvent.run(code, msg, data); +// }else { +// errorEvent.run(code, msg, data); +// } +// }); +// } else if ("Download".equalsIgnoreCase(sessionName)) { +// // 获取指定的下载速度 +// Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); +// MediaDescription mediaDescription = null; +// String downloadSpeed = "1"; +// if (sdpMediaDescriptions.size() > 0) { +// mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0); +// } +// if (mediaDescription != null) { +// downloadSpeed = mediaDescription.getAttribute("downloadspeed"); +// } +// +// sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); +// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); +// sendRtpItem.setStreamId(ssrcInfo.getStream()); +// // 写入redis, 超时时回复 +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), +// DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed), +// (code, msg, data) -> { +// if (code == InviteErrorCode.SUCCESS.getCode()) { +// hookEvent.run(code, msg, data); +// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { +// logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId); +// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); +// errorEvent.run(code, msg, data); +// } else { +// errorEvent.run(code, msg, data); +// } +// }); +// } else { +// +// SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { +// if (code == InviteErrorCode.SUCCESS.getCode()) { +// hookEvent.run(code, msg, data); +// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { +// logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); +// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); +// errorEvent.run(code, msg, data); +// } else { +// errorEvent.run(code, msg, data); +// } +// })); +// sendRtpItem.setPlayType(InviteStreamType.PLAY); +// String streamId = String.format("%s_%s", device.getDeviceId(), channelId); +// sendRtpItem.setStreamId(streamId); +// sendRtpItem.setSsrc(ssrcInfo.getSsrc()); +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// +// } +// } else if (gbStream != null) { +// +// String ssrc; +// if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { +// // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 +// ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); +// }else { +// ssrc = gb28181Sdp.getSsrc(); +// } +// +// if("push".equals(gbStream.getStreamType())) { +// if (streamPushItem != null && streamPushItem.isPushIng()) { +// // 推流状态 +// pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, +// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); +// } else { +// // 未推流 拉起 +// notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, +// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); +// } +// }else if ("proxy".equals(gbStream.getStreamType())){ +// if (null != proxyByAppAndStream) { +// if(proxyByAppAndStream.isStatus()){ +// pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, +// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); +// }else{ +// //开启代理拉流 +// notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, +// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); +// } +// } +// +// +// } +// } } } catch (SdpParseException e) { logger.error("sdp解析错误", e); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 2d5fcd70..793872c8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -245,8 +245,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent logger.warn("[ NotifyCatalog ] event not found : {}", event ); } - // 转发变化信息 - eventPublisher.catalogEventPublish(null, channel, event); if (updateChannelMap.keySet().size() > 0 || addChannelMap.keySet().size() > 0 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java index e92fceb0..3e9e9fe7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; @@ -42,7 +43,7 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem private IVideoManagerStorage storager; @Autowired - private SIPCommanderFroPlatform cmderFroPlatform; + private ISIPCommanderForPlatform cmderFroPlatform; @Autowired private IPlatformChannelService platformChannelService; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index 9fb770ca..9eb9cac4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -1,11 +1,10 @@ package com.genersoft.iot.vmp.gb28181.utils; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType; -import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp; -import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.GitUtil; +import gov.nist.javax.sdp.TimeDescriptionImpl; +import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.Subject; @@ -15,21 +14,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.ObjectUtils; -import javax.sdp.SdpFactory; -import javax.sdp.SdpParseException; -import javax.sdp.SessionDescription; +import javax.sdp.*; +import javax.sip.InvalidArgumentException; import javax.sip.PeerUnavailableException; +import javax.sip.SipException; import javax.sip.SipFactory; import javax.sip.header.FromHeader; import javax.sip.header.Header; import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; +import javax.sip.message.Response; import java.text.ParseException; +import java.time.Instant; import java.time.LocalDateTime; import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.Vector; /** * @author panlinlin @@ -203,7 +205,7 @@ public class SipUtils { return deviceChannel; } - public static Gb28181Sdp parseSDP(String sdpStr) throws SdpParseException { + public static Gb28181Sdp parseSDP(String sdpStr) throws SdpException { // jainSip不支持y= f=字段, 移除以解析。 int ssrcIndex = sdpStr.indexOf("y="); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index e8865fef..a0966772 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -413,18 +413,8 @@ public class ZLMHttpHookListener { type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream()); } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { -// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } zlmMediaListManager.streamOffline(param.getApp(), param.getStream()); } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { - if (userSetting.isUsePushingAsStatus()) { - eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF); - } - } if (type != null) { // 发送流变化redis消息 JSONObject jsonObject = new JSONObject(); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 8c84133f..7d7ebc93 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -162,7 +162,7 @@ public class ZLMServerFactory { * @return SendRtpItem */ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, - String deviceId, String channelId, boolean tcp, boolean rtcp){ + String channelId, boolean tcp, boolean rtcp){ int localPort = sendRtpPortManager.getNextPort(serverItem); if (localPort == 0) { @@ -173,7 +173,6 @@ public class ZLMServerFactory { sendRtpItem.setPort(port); sendRtpItem.setSsrc(ssrc); sendRtpItem.setPlatformId(platformId); - sendRtpItem.setDeviceId(deviceId); sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(tcp); sendRtpItem.setRtcp(rtcp); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/service/IReceiveRtpService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/service/IReceiveRtpService.java new file mode 100644 index 00000000..2535d6dd --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/service/IReceiveRtpService.java @@ -0,0 +1,8 @@ +package com.genersoft.iot.vmp.media.zlm.service; + +/** + * 管理rtp流的接收 + */ +public interface IReceiveRtpService { + +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/service/ISendRtpService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/service/ISendRtpService.java new file mode 100644 index 00000000..9a181098 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/service/ISendRtpService.java @@ -0,0 +1,7 @@ +package com.genersoft.iot.vmp.media.zlm.service; + +/** + * 管理rtp流的发送 + */ +public interface ISendRtpService { +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/service/impl/SendRtpServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/service/impl/SendRtpServiceImpl.java new file mode 100644 index 00000000..6131ef35 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/service/impl/SendRtpServiceImpl.java @@ -0,0 +1,50 @@ +package com.genersoft.iot.vmp.media.zlm.service.impl; + +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.service.ISendRtpService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class SendRtpServiceImpl implements ISendRtpService { + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + /** + * 创建一个国标推流 + * @param serverItem 推流使用的流媒体 + * @param dstIp 目标IP + * @param dstPort 目标端口 + * @param ssrc SSRC + * @param sourceId 推流信息唯一标识 + * @param callId 关联的Invite会话callId + * @param tcp 是否使用TCP推流 + * @param rtcp 是否开启RTCP保活,tcp为false时有效 + * @return SendRtpItem + */ + public SendRtpItem createSendRtpInfo(MediaServerItem serverItem, String dstIp, int dstPort, String ssrc, String sourceId, + String callId, boolean tcp, boolean rtcp){ + +// int localPort = sendRtpPortManager.getNextPort(serverItem); +// if (localPort == 0) { +// return null; +// } +// SendRtpItem sendRtpItem = new SendRtpItem(); +// sendRtpItem.setIp(ip); +// sendRtpItem.setPort(port); +// sendRtpItem.setSsrc(ssrc); +// sendRtpItem.setPlatformId(platformId); +// sendRtpItem.setChannelId(channelId); +// sendRtpItem.setTcp(tcp); +// sendRtpItem.setRtcp(rtcp); +// sendRtpItem.setApp("rtp"); +// sendRtpItem.setLocalPort(localPort); +// sendRtpItem.setServerId(userSetting.getServerId()); +// sendRtpItem.setMediaServerId(serverItem.getId()); +// return sendRtpItem; + return null; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java index f91bce54..ba3c150f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java @@ -87,4 +87,6 @@ public interface ICommonGbChannelService { void deleteByIdList(List commonChannelIdList); void offlineForList(List onlinePushers); + + void onlineForList(List commonChannelIdList); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java index 5a1e4638..d524e1be 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java @@ -30,4 +30,9 @@ public interface IPlatformChannelService { * 查询关联了上级平台的所有通道 */ List queryChannelList(ParentPlatform platform); + + /** + * 查询通道 + */ + CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(Integer platformId, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IResourcePlayCallback.java b/src/main/java/com/genersoft/iot/vmp/service/IResourcePlayCallback.java index 08f3fce7..76850988 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IResourcePlayCallback.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IResourcePlayCallback.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; /** * 资源播放回调 @@ -10,10 +11,6 @@ public interface IResourcePlayCallback { /** * 资源播放回调 - * @param commonGbChannel 通道 - * @param code - * @param message - * @param streamInfo */ - void call(CommonGbChannel commonGbChannel, int code, String message, StreamInfo streamInfo); + void call(CommonGbChannel commonGbChannel, MediaServerItem mediaServerItem, int code, String message, StreamInfo streamInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java b/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java index 7ba811a6..00befa24 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java @@ -42,4 +42,14 @@ public interface IResourceService { * 流离线 */ void streamOffline(String app, String streamId); + + /** + * 录像回放 + */ + void startPlayback(CommonGbChannel channel, Long startTime, Long stopTime, IResourcePlayCallback callback); + + /** + * 录像下载 + */ + void startDownload(CommonGbChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, IResourcePlayCallback playCallback); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java index f340195d..f2b512bc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java @@ -784,4 +784,9 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { // TODO 向国标级联发送catalog } + + @Override + public void onlineForList(List commonChannelIdList) { + // TODO 向国标级联发送catalog + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 89312035..0d7dbaca 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -265,12 +265,18 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Override public void batchAddChannel(List channels) { + List commonGbChannelList = new ArrayList<>(); + channels.stream().forEach(channel->{ + commonGbChannelList.add(CommonGbChannel.getInstance(null, channel)); + }); + channelMapper.batchAdd(channels); for (DeviceChannel channel : channels) { if (channel.getParentId() != null) { channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); } } + } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java index 7771e422..8aaf049a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java @@ -146,8 +146,11 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { } } + return null; + } - + @Override + public CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(Integer platformId, String channelId) { return null; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index dfc6111f..7a1801b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -395,20 +395,34 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public void offline(List offlineStreams) { + + List streamPushList = streamPushMapper.getListIn(offlineStreams); + List commonChannelIdList = new ArrayList<>(); + streamPushList.stream().forEach(streamPush -> { + commonChannelIdList.add(streamPush.getCommonGbChannelId()); + }); + // 更新部分设备离线 - List onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams); - streamPushMapper.offline(offlineStreams); - // 发送通知 - eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); + streamPushMapper.offline(streamPushList); + if (!commonChannelIdList.isEmpty()) { + commonGbChannelService.offlineForList(commonChannelIdList); + } + } @Override public void online(List onlineStreams) { - // 更新部分设备上线streamPushService - List onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams); - streamPushMapper.online(onlineStreams); - // 发送通知 - eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); + List streamPushList = streamPushMapper.getListIn(onlineStreams); + List commonChannelIdList = new ArrayList<>(); + streamPushList.stream().forEach(streamPush -> { + commonChannelIdList.add(streamPush.getCommonGbChannelId()); + }); + + // 更新部分设备离线 + streamPushMapper.offline(streamPushList); + if (!commonChannelIdList.isEmpty()) { + commonGbChannelService.onlineForList(commonChannelIdList); + } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index f201fece..d16bc35e 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -142,12 +142,12 @@ public interface StreamPushMapper { List getOnlinePusherForGbInList(List offlineStreams); @Update("") - void offline(List offlineStreams); + void offline(List offlineStreams); @Select("") - List getListIn(List streamPushItems); + List getListIn(@Param("streamPushItems") List streamPushItems); @Select("select* from wvp_stream_push where id = #{id}") StreamPush query(@Param("id") Integer id);