Merge branch 'main' into main2

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
#	src/main/resources/all-application.yml
结构优化
648540858 2023-01-05 11:07:06 +08:00
commit 6fa5b37b96
15 changed files with 343 additions and 146 deletions

View File

@ -69,6 +69,7 @@ public class VideoManagerConstants {
public static final String SYSTEM_INFO_NET_PREFIX = "VMP_SYSTEM_INFO_NET_"; public static final String SYSTEM_INFO_NET_PREFIX = "VMP_SYSTEM_INFO_NET_";
public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_"; public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_";
public static final String BROADCAST_WAITE_INVITE = "task_broadcast_waite_invite_";

View File

@ -43,6 +43,8 @@ public class UserSetting {
private Boolean syncChannelOnDeviceOnline = Boolean.FALSE; private Boolean syncChannelOnDeviceOnline = Boolean.FALSE;
private Boolean pushStreamAfterAck = Boolean.FALSE;
private String serverId = "000000"; private String serverId = "000000";
private String thirdPartyGBIdReg = "[\\s\\S]*"; private String thirdPartyGBIdReg = "[\\s\\S]*";
@ -206,4 +208,12 @@ public class UserSetting {
public void setBroadcastForPlatform(String broadcastForPlatform) { public void setBroadcastForPlatform(String broadcastForPlatform) {
this.broadcastForPlatform = broadcastForPlatform; this.broadcastForPlatform = broadcastForPlatform;
} }
public Boolean getPushStreamAfterAck() {
return pushStreamAfterAck;
}
public void setPushStreamAfterAck(Boolean pushStreamAfterAck) {
this.pushStreamAfterAck = pushStreamAfterAck;
}
} }

View File

@ -83,4 +83,19 @@ public class AudioBroadcastManager {
return audioBroadcastCatch; return audioBroadcastCatch;
} }
public List<AudioBroadcastCatch> get(String deviceId) {
List<AudioBroadcastCatch> audioBroadcastCatchList= new ArrayList<>();
if (SipUtils.isFrontEnd(deviceId)) {
audioBroadcastCatchList.add(data.get(deviceId));
}else {
for (String key : data.keySet()) {
if (key.startsWith(deviceId)) {
audioBroadcastCatchList.add(data.get(key));
}
}
}
return audioBroadcastCatchList;
}
} }

View File

@ -1,14 +1,10 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request; package com.genersoft.iot.vmp.gb28181.transmit.event.request;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import gov.nist.javax.sip.stack.SIPServerTransactionImpl;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
@ -17,14 +13,14 @@ import org.dom4j.io.SAXReader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.security.core.parameters.P;
import javax.sip.*; import javax.sip.*;
import javax.sip.address.Address; import javax.sip.address.Address;
import javax.sip.address.AddressFactory; import javax.sip.address.AddressFactory;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.*; import javax.sip.header.ContentTypeHeader;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.HeaderFactory;
import javax.sip.message.MessageFactory; import javax.sip.message.MessageFactory;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
@ -157,7 +153,10 @@ public abstract class SIPRequestProcessorParent {
responseAckExtraParam.content = sdp; responseAckExtraParam.content = sdp;
responseAckExtraParam.sipURI = sipURI; responseAckExtraParam.sipURI = sipURI;
return responseAck(request, Response.OK, null, responseAckExtraParam); SIPResponse sipResponse = responseAck(request, Response.OK, null, responseAckExtraParam);
return sipResponse;
} }
/** /**
@ -190,7 +189,8 @@ public abstract class SIPRequestProcessorParent {
reader.setEncoding(charset); reader.setEncoding(charset);
// 对海康出现的未转义字符做处理。 // 对海康出现的未转义字符做处理。
String[] destStrArray = new String[]{"&lt;","&gt;","&amp;","&apos;","&quot;"}; String[] destStrArray = new String[]{"&lt;","&gt;","&amp;","&apos;","&quot;"};
char despChar = '&'; // 或许可扩展兼容其他字符 // 或许可扩展兼容其他字符
char despChar = '&';
byte destBye = (byte) despChar; byte destBye = (byte) despChar;
List<Byte> result = new ArrayList<>(); List<Byte> result = new ArrayList<>();
byte[] rawContent = request.getRawContent(); byte[] rawContent = request.getRawContent();
@ -220,4 +220,5 @@ public abstract class SIPRequestProcessorParent {
return xml.getRootElement(); return xml.getRootElement();
} }
} }

View File

@ -1,24 +1,18 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -29,15 +23,15 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress; import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader; import javax.sip.header.ToHeader;
import java.text.ParseException; import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/** /**
* SIP ACK * SIP ACK
@ -46,7 +40,7 @@ import java.text.ParseException;
@Component @Component
public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); private final Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
private final String method = "ACK"; private final String method = "ACK";
@Autowired @Autowired
@ -73,32 +67,21 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired
private ZlmHttpHookSubscribe subscribe;
@Autowired @Autowired
private DynamicTask dynamicTask; private DynamicTask dynamicTask;
@Autowired
private ISIPCommander cmder;
@Autowired
private IDeviceService deviceService;
@Autowired
private ISIPCommanderForPlatform commanderForPlatform;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Autowired @Autowired
private RedisGbPlayMsgListener redisGbPlayMsgListener; private RedisGbPlayMsgListener redisGbPlayMsgListener;
@Autowired
private UserSetting userSetting;
@Autowired
private IPlayService playService;
/** /**
* ACK * ACK
*
* @param evt
*/ */
@Override @Override
public void process(RequestEvent evt) { public void process(RequestEvent evt) {
@ -106,44 +89,45 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
logger.info("[收到ACK] platformGbId->{}", platformGbId); logger.info("[收到ACK] platformGbId->{}", platformGbId);
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); if (userSetting.getPushStreamAfterAck()) {
// 取消设置的超时任务 ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId);
dynamicTask.stop(callIdHeader.getCallId()); // 取消设置的超时任务
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); dynamicTask.stop(callIdHeader.getCallId());
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
if (sendRtpItem == null) { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); if (sendRtpItem == null) {
return; logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId);
} return;
String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; }
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStreamId(), MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
if (mediaInfo == null) { sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( if (mediaInfo == null) {
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, callIdHeader); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
}); startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, callIdHeader);
}else { });
JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem); }else {
if (startSendRtpStreamResult != null) { JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem);
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, callIdHeader); if (startSendRtpStreamResult != null) {
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, callIdHeader);
}
} }
} }
} }
private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
JSONObject jsonObject, CallIdHeader callIdHeader) { JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
if (jsonObject == null) { if (jsonObject == null) {
logger.error("RTP推流失败: 请检查ZLM服务"); logger.error("RTP推流失败: 请检查ZLM服务");
} else if (jsonObject.getInteger("code") == 0) { } else if (jsonObject.getInteger("code") == 0) {
logger.info("调用ZLM推流接口, 结果: {}", jsonObject); logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, " ,sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getIp(), sendRtpItem.getPort()); logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
} else { } else {
logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(sendRtpItem)); logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
if (sendRtpItem.isOnlyAudio()) { if (sendRtpItem.isOnlyAudio()) {
Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
@ -152,17 +136,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
} catch (SipException | ParseException | InvalidArgumentException | } catch (SipException | ParseException | InvalidArgumentException |
SsrcTransactionNotFoundException e) { SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止语音喊话: {}", e.getMessage()); logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
} }
} }
}else {
// 向上级平台
try {
commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
} }
} }
} }
} }

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
@ -439,18 +440,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
try { try {
// 超时未收到Ack应该回复bye,当前等待时间为10秒 // 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask.startDelay(callIdHeader.getCallId(), () -> { if (userSetting.getPushStreamAfterAck()) {
logger.info("Ack 等待超时"); dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); logger.info("Ack 等待超时");
// 回复bye mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
try { // 回复bye
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); try {
} catch (SipException | InvalidArgumentException | ParseException e) { cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } catch (SipException | InvalidArgumentException | ParseException e) {
} logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}, 60 * 1000); }
responseSdpAck(request, content.toString(), platform); }, 60 * 1000);
}
SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform);
if (!userSetting.getPushStreamAfterAck()) {
playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader());
}
} catch (SipException e) { } catch (SipException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) { } catch (InvalidArgumentException e) {
@ -878,7 +884,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
content.append("f=\r\n"); content.append("f=\r\n");
try { try {
return responseSdpAck(request, content.toString(), platform); SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform);
if (!userSetting.getPushStreamAfterAck()) {
playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader());
}
return sipResponse;
} catch (SipException e) { } catch (SipException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) { } catch (InvalidArgumentException e) {
@ -905,11 +915,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
if (device != null) { if (device != null) {
logger.info("收到设备" + requesterId + "的语音广播Invite请求"); logger.info("收到设备" + requesterId + "的语音广播Invite请求");
String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + audioBroadcastCatch.getChannelId();
dynamicTask.stop(key);
try { try {
responseAck(request, Response.TRYING); responseAck(request, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
return;
} }
String contentString = new String(request.getRawContent()); String contentString = new String(request.getRawContent());
// jainSip不支持y=字段, 移除移除以解析。 // jainSip不支持y=字段, 移除移除以解析。
@ -964,11 +977,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415 responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); logger.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
return;
} }
return; return;
} }
String addressStr = sdp.getOrigin().getAddress(); String addressStr = sdp.getOrigin().getAddress();
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}", requesterId, addressStr, port, ssrc); logger.info("设备{}请求语音流,地址:{}:{}ssrc{}, {}", requesterId, addressStr, port, ssrc,
mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP");
MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device);
if (mediaServerItem == null) { if (mediaServerItem == null) {
@ -977,6 +993,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.BUSY_HERE); responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); logger.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
} }
return; return;
} }
@ -990,13 +1007,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.BUSY_HERE); responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
return;
} }
return; return;
} }
sendRtpItem.setTcp(mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
String app = "broadcast"; String app = "broadcast";
String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId();
@ -1011,6 +1027,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setUsePs(false); sendRtpItem.setUsePs(false);
sendRtpItem.setRtcp(false); sendRtpItem.setRtcp(false);
sendRtpItem.setOnlyAudio(true); sendRtpItem.setOnlyAudio(true);
sendRtpItem.setTcp(mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
@ -1023,11 +1044,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.GONE); responseAck(request, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 语音通话 回复410失败 {}", e.getMessage()); logger.error("[命令发送失败] 语音通话 回复410失败 {}", e.getMessage());
return;
} }
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
} }
} catch (SdpException e) { } catch (SdpException e) {
logger.error("[SDP解析异常]", e); logger.error("[SDP解析异常]", e);
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
} }
} else { } else {
logger.warn("来自无效设备/平台的请求"); logger.warn("来自无效设备/平台的请求");
@ -1084,6 +1107,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse);
audioBroadcastManager.update(audioBroadcastCatch); audioBroadcastManager.update(audioBroadcastCatch);
// 开启发流大华在收到200OK后就会开始建立连接
if (!userSetting.getPushStreamAfterAck()) {
playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader());
}
} catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) { } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) {
logger.error("[命令发送失败] 语音喊话 回复200OKSDP: {}", e.getMessage()); logger.error("[命令发送失败] 语音喊话 回复200OKSDP: {}", e.getMessage());
} }

View File

@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
@ -9,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.service.IPlayService;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.Element; import org.dom4j.Element;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -35,11 +38,14 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
private ResponseMessageHandler responseMessageHandler; private ResponseMessageHandler responseMessageHandler;
@Autowired @Autowired
private DeferredResultHolder deferredResultHolder; private DynamicTask dynamicTask;
@Autowired @Autowired
private AudioBroadcastManager audioBroadcastManager; private AudioBroadcastManager audioBroadcastManager;
@Autowired
private IPlayService playService;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this); responseMessageHandler.addHandler(cmdType, this);
@ -47,6 +53,8 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
@Override @Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) { public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
SIPRequest request = (SIPRequest) evt.getRequest();
try { try {
String channelId = getText(rootElement, "DeviceID"); String channelId = getText(rootElement, "DeviceID");
if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
@ -55,12 +63,23 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
return; return;
} }
String result = getText(rootElement, "Result"); String result = getText(rootElement, "Result");
logger.info("收到语音广播的回复 {}{}/{}", result, device.getDeviceId(), channelId ); logger.info("[语音广播]回复:{}, {}/{}", result, device.getDeviceId(), channelId );
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite);
audioBroadcastManager.update(audioBroadcastCatch);
// 回复200 OK // 回复200 OK
responseAck((SIPRequest) evt.getRequest(), Response.OK); responseAck(request, Response.OK);
if (result.equalsIgnoreCase("OK")) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite);
audioBroadcastManager.update(audioBroadcastCatch);
// 等待invite消息 超时则结束
String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + channelId;
dynamicTask.startDelay(key, ()->{
logger.info("[语音广播]等待invite消息超时{}/{}", device.getDeviceId(), channelId);
playService.stopAudioBroadcast(device.getDeviceId(), channelId);
}, 2000);
}else {
playService.stopAudioBroadcast(device.getDeviceId(), channelId);
}
} catch (ParseException | SipException | InvalidArgumentException e) { } catch (ParseException | SipException | InvalidArgumentException e) {
logger.error("[命令发送失败] 国标级联 语音喊话: {}", e.getMessage()); logger.error("[命令发送失败] 国标级联 语音喊话: {}", e.getMessage());
} }

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class);
private final String cmdType = "Catalog"; private final String cmdType = "Catalog";
@Autowired @Autowired

View File

@ -323,7 +323,7 @@ public class ZLMHttpHookListener {
}); });
if ("rtsp".equals(param.getSchema())){ if ("rtsp".equals(param.getSchema())){
logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
if (param.isRegist()) { if (param.isRegist()) {
mediaServerService.addCount(param.getMediaServerId()); mediaServerService.addCount(param.getMediaServerId());
}else { }else {
@ -383,10 +383,10 @@ public class ZLMHttpHookListener {
} }
}else { }else {
logger.info("[语音喊话] 推流指向的·通道{}未找到", channelId); logger.info("[语音对讲] 未找到通道:{}", channelId);
} }
}else { }else{
logger.info("[语音喊话] 推流指向的·设备{}未找到", deviceId); logger.info("[语音对讲] 未找到设备:{}", deviceId);
} }
}else { }else {
logger.info("[语音喊话] 推流格式有误, 格式为: broadcast/设备编号_通道编号 "); logger.info("[语音喊话] 推流格式有误, 格式为: broadcast/设备编号_通道编号 ");

View File

@ -36,7 +36,7 @@ public class ZLMRESTfulUtils {
// 设置连接超时时间 // 设置连接超时时间
httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS); httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS);
// 设置读取超时时间 // 设置读取超时时间
httpClientBuilder.readTimeout(5,TimeUnit.SECONDS); httpClientBuilder.readTimeout(15,TimeUnit.SECONDS);
// 设置连接池 // 设置连接池
httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES)); httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES));
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {

View File

@ -3,9 +3,7 @@ package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -15,11 +13,14 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import gov.nist.javax.sip.message.SIPResponse;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import java.text.ParseException; import java.text.ParseException;
import java.util.Map;
/** /**
* *
@ -64,4 +65,9 @@ public interface IPlayService {
void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader);
void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader);
} }

View File

@ -3,12 +3,15 @@ package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
@ -32,9 +35,7 @@ import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import java.text.ParseException; import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -89,6 +90,12 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Override @Override
public void online(Device device) { public void online(Device device) {
logger.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); logger.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
@ -183,6 +190,23 @@ public class DeviceServiceImpl implements IDeviceService {
// 移除订阅 // 移除订阅
removeCatalogSubscribe(device); removeCatalogSubscribe(device);
removeMobilePositionSubscribe(device); removeMobilePositionSubscribe(device);
List<AudioBroadcastCatch> audioBroadcastCatches = audioBroadcastManager.get(deviceId);
if (audioBroadcastCatches.size() > 0) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
if (sendRtpItem != null) {
redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStreamId());
zlmresTfulUtils.stopSendRtp(mediaInfo, param);
}
audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId());
}
}
} }
@Override @Override

View File

@ -24,16 +24,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.bean.PlayBackResult;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
@ -42,6 +41,7 @@ import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import gov.nist.javax.sip.message.SIPResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -54,13 +54,11 @@ import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent; import javax.sip.ResponseEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.text.ParseException; import java.text.ParseException;
import java.util.HashMap; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@SuppressWarnings(value = {"rawtypes", "unchecked"}) @SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service @Service
@ -119,11 +117,20 @@ public class PlayServiceImpl implements IPlayService {
@Autowired @Autowired
private ZlmHttpHookSubscribe subscribe; private ZlmHttpHookSubscribe subscribe;
@Autowired
private ISIPCommanderForPlatform commanderForPlatform;
@Qualifier("taskExecutor") @Qualifier("taskExecutor")
@Autowired @Autowired
private ThreadPoolTaskExecutor taskExecutor; private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private RedisGbPlayMsgListener redisGbPlayMsgListener;
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
@Override @Override
public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
@ -1024,8 +1031,20 @@ public class PlayServiceImpl implements IPlayService {
return false; return false;
} }
// 查询通道使用状态 // 查询通道使用状态
if (audioBroadcastInUse(device, channelId)) { if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
return false; SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId());
if (streamReady) {
logger.warn("语音广播已经开启: {}", channelId);
event.call("语音广播已经开启");
return;
} else {
stopAudioBroadcast(device.getDeviceId(), channelId);
}
}
} }
// 发送通知 // 发送通知
@ -1063,28 +1082,31 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void stopAudioBroadcast(String deviceId, String channelId) { public void stopAudioBroadcast(String deviceId, String channelId) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); List<AudioBroadcastCatch> audioBroadcastCatchList = new ArrayList<>();
if (audioBroadcastCatch != null) { if (channelId == null) {
audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId));
}else {
audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId));
}
if (audioBroadcastCatchList.size() > 0) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) {
Device device = deviceService.getDevice(deviceId);
if (device == null || audioBroadcastCatch == null ) {
return;
}
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
if (sendRtpItem != null) {
redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStreamId());
zlmresTfulUtils.stopSendRtp(mediaInfo, param);
}
Device device = deviceService.getDevice(deviceId); audioBroadcastManager.del(deviceId, channelId);
if (device == null) {
return;
} }
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
if (sendRtpItem != null) {
redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStreamId());
zlmresTfulUtils.stopSendRtp(mediaInfo, param);
}
if (audioBroadcastCatch.isFromPlatform()) {
// TODO 向上级发送BYE结束语音喊话
}
audioBroadcastManager.del(deviceId, channelId);
} }
} }
@ -1187,4 +1209,100 @@ public class PlayServiceImpl implements IPlayService {
Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
cmder.playResumeCmd(device, streamInfo); cmder.playResumeCmd(device, streamInfo);
} }
@Override
public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
// 开始发流
// 取消设置的超时任务
// String channelId = request.getCallIdHeader().getCallId();
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStreamId());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
param.put("is_udp", is_Udp);
if (!sendRtpItem.isTcp()) {
// udp模式下开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
}
if (mediaInfo == null) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
});
} else {
// 如果是非严格模式,需要关闭端口占用
JSONObject startSendRtpStreamResult = null;
if (sendRtpItem.getLocalPort() != 0) {
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId());
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
}else {
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
}
}else {
if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
}else {
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
}
if (startSendRtpStreamResult != null) {
startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader);
}
}
}
@Override
public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
if (jsonObject == null) {
logger.error("RTP推流失败: 请检查ZLM服务");
} else if (jsonObject.getInteger("code") == 0) {
logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
} else {
logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
if (sendRtpItem.isOnlyAudio()) {
Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null) {
try {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
} catch (SipException | ParseException | InvalidArgumentException |
SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
}
}
}else {
// 向上级平台
try {
commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}
}
}
} }

View File

@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@ -90,12 +89,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
@Autowired @Autowired
private PlatformGbStreamMapper platformGbStreamMapper; private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private IGbStreamService gbStreamService;
@Autowired
private ParentPlatformMapper parentPlatformMapper;
/** /**
* ID * ID
* *

View File

@ -197,6 +197,8 @@ user-settings:
sync-channel-on-device-online: false sync-channel-on-device-online: false
# 国标级联语音喊话发流模式 * UDP:udp传输 TCP-ACTIVEtcp主动模式 TCP-PASSIVEtcp被动模式 # 国标级联语音喊话发流模式 * UDP:udp传输 TCP-ACTIVEtcp主动模式 TCP-PASSIVEtcp被动模式
broadcast-for-platform: UDP broadcast-for-platform: UDP
# 收到ack消息后开始发流默认false 回复200ok后直接开始发流
push-stream-after-ack: false
# 关闭在线文档(生产环境建议关闭) # 关闭在线文档(生产环境建议关闭)
springdoc: springdoc: