优化语音对讲支持根据设备设置释放收到ACK后开始发流

结构优化
648540858 2023-07-02 13:53:45 +08:00
parent 3571ca272b
commit 4604aaea99
9 changed files with 127 additions and 100 deletions

View File

@ -47,8 +47,6 @@ public class UserSetting {
private Boolean syncChannelOnDeviceOnline = Boolean.FALSE;
private Boolean pushStreamAfterAck = Boolean.FALSE;
private Boolean sipLog = Boolean.FALSE;
private Boolean sqlLog = Boolean.FALSE;
private Boolean sendToPlatformsWhenIdLost = Boolean.FALSE;
@ -234,14 +232,6 @@ public class UserSetting {
this.broadcastForPlatform = broadcastForPlatform;
}
public Boolean getPushStreamAfterAck() {
return pushStreamAfterAck;
}
public void setPushStreamAfterAck(Boolean pushStreamAfterAck) {
this.pushStreamAfterAck = pushStreamAfterAck;
}
public Boolean getSipUseSourceIpAsRemoteAddress() {
return sipUseSourceIpAsRemoteAddress;
}

View File

@ -188,8 +188,8 @@ public class Device {
@Schema(description = "设备注册的事务信息")
private SipTransactionInfo sipTransactionInfo;
@Schema(description = "控制语音对讲流程释放收到ACK后发流")
private boolean broadcastPushAfterAck;
public String getDeviceId() {
return deviceId;
@ -465,4 +465,11 @@ public class Device {
/*======================设备主子码流逻辑END=========================*/
public boolean isBroadcastPushAfterAck() {
return broadcastPushAfterAck;
}
public void setBroadcastPushAfterAck(boolean broadcastPushAfterAck) {
this.broadcastPushAfterAck = broadcastPushAfterAck;
}
}

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@ -10,9 +11,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
@ -62,6 +62,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IVideoManagerStorage storager;
@Autowired
private IDeviceService deviceService;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@ -87,40 +90,23 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Override
public void process(RequestEvent evt) {
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
String fromUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String toUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
logger.info("[收到ACK] 来自->{}", fromUserId);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
if (sendRtpItem == null) {
logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId);
return;
}
logger.info("[收到ACK]rtp/{}开始级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
// 取消设置的超时任务
dynamicTask.stop(callIdHeader.getCallId());
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(fromUserId);
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
logger.info("[收到ACK] platformGbId->{}", platformGbId);
if (userSetting.getPushStreamAfterAck()) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId);
// 取消设置的超时任务
dynamicTask.stop(callIdHeader.getCallId());
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
if (sendRtpItem == null) {
logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId);
return;
}
String isUdp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACKrtp/{}开始级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
param.put("is_udp", isUdp);
if (!sendRtpItem.isTcp()) {
// udp模式下开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
}
if (parentPlatform != null) {
Map<String, Object> param = getSendRtpParam(sendRtpItem);
if (mediaInfo == null) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
@ -130,30 +116,75 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader);
});
} else {
// 如果是非严格模式,需要关闭端口占用
JSONObject startSendRtpStreamResult = null;
if (sendRtpItem.getLocalPort() != 0) {
if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
}else {
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
}else {
if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
}else {
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
}
JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
if (startSendRtpStreamResult != null) {
playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
}
}
}else {
Device device = deviceService.getDevice(fromUserId);
if (device == null) {
logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId());
return;
}
// 设置为收到ACK后发送语音的设备已经在发送200OK开始发流了
if (!device.isBroadcastPushAfterAck()) {
return;
}
if (mediaInfo == null) {
logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId());
return;
}
Map<String, Object> param = getSendRtpParam(sendRtpItem);
JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
if (startSendRtpStreamResult != null) {
playService.startSendRtpStreamHand(sendRtpItem, device, startSendRtpStreamResult, param, callIdHeader);
}
}
}
private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) {
String isUdp = sendRtpItem.isTcp() ? "0" : "1";
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
param.put("is_udp", isUdp);
if (!sendRtpItem.isTcp()) {
// udp模式下开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
}
return param;
}
private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){
JSONObject startSendRtpStreamResult = null;
if (sendRtpItem.getLocalPort() != 0) {
if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
}else {
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
}else {
if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
}else {
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
}
return startSendRtpStreamResult;
}
}

View File

@ -427,23 +427,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
try {
// 超时未收到Ack应该回复bye,当前等待时间为10秒
if (userSetting.getPushStreamAfterAck()) {
dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
logger.info("Ack 等待超时");
mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
// 回复bye
try {
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}, 60 * 1000);
}
dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
logger.info("Ack 等待超时");
mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
// 回复bye
try {
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}, 60 * 1000);
SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform);
if (!userSetting.getPushStreamAfterAck()) {
playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader());
}
responseSdpAck(request, content.toString(), platform);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
}
@ -650,7 +645,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
} else {
@ -888,16 +882,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
content.append("f=\r\n");
try {
SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform);
if (!userSetting.getPushStreamAfterAck()) {
playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader());
}
return sipResponse;
} catch (SipException e) {
logger.error("未处理的异常 ", e);
} catch (InvalidArgumentException e) {
logger.error("未处理的异常 ", e);
} catch (ParseException e) {
return responseSdpAck(request, content.toString(), platform);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("未处理的异常 ", e);
}
return null;
@ -1132,7 +1118,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
audioBroadcastManager.update(audioBroadcastCatch);
// 开启发流大华在收到200OK后就会开始建立连接
if (!userSetting.getPushStreamAfterAck()) {
if (!device.isBroadcastPushAfterAck()) {
playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader());
}

View File

@ -64,7 +64,7 @@ public interface IPlayService {
void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader);
void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo,
JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader);
void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event);

View File

@ -1481,7 +1481,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
public void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo,
JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
if (jsonObject == null) {
logger.error("RTP推流失败: 请检查ZLM服务");
@ -1504,10 +1504,13 @@ public class PlayServiceImpl implements IPlayService {
}
} else {
// 向上级平台
try {
commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
if (correlationInfo instanceof ParentPlatform) {
try {
ParentPlatform parentPlatform = (ParentPlatform)correlationInfo;
commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}
}
}

View File

@ -43,6 +43,7 @@ public interface DeviceMapper {
"on_line," +
"media_server_id," +
"switch_primary_sub_stream," +
"broadcast_push_after_ack," +
"(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+
" FROM wvp_device WHERE device_id = #{deviceId}")
Device getDeviceByDeviceId(String deviceId);
@ -73,6 +74,7 @@ public interface DeviceMapper {
"subscribe_cycle_for_alarm,"+
"ssrc_check,"+
"as_message_channel,"+
"broadcast_push_after_ack,"+
"geo_coord_sys,"+
"on_line"+
") VALUES (" +
@ -101,6 +103,7 @@ public interface DeviceMapper {
"#{subscribeCycleForAlarm}," +
"#{ssrcCheck}," +
"#{asMessageChannel}," +
"#{broadcastPushAfterAck}," +
"#{geoCoordSys}," +
"#{onLine}" +
")")
@ -155,6 +158,7 @@ public interface DeviceMapper {
"subscribe_cycle_for_alarm,"+
"ssrc_check,"+
"as_message_channel,"+
"broadcast_push_after_ack,"+
"geo_coord_sys,"+
"on_line,"+
"media_server_id,"+
@ -196,6 +200,7 @@ public interface DeviceMapper {
"subscribe_cycle_for_alarm,"+
"ssrc_check,"+
"as_message_channel,"+
"broadcast_push_after_ack,"+
"geo_coord_sys,"+
"on_line"+
" FROM wvp_device WHERE on_line = true")
@ -226,6 +231,7 @@ public interface DeviceMapper {
"subscribe_cycle_for_alarm,"+
"ssrc_check,"+
"as_message_channel,"+
"broadcast_push_after_ack,"+
"geo_coord_sys,"+
"on_line"+
" FROM wvp_device WHERE ip = #{host} AND port=#{port}")
@ -247,6 +253,7 @@ public interface DeviceMapper {
"<if test=\"subscribeCycleForAlarm != null\">, subscribe_cycle_for_alarm=#{subscribeCycleForAlarm}</if>" +
"<if test=\"ssrcCheck != null\">, ssrc_check=#{ssrcCheck}</if>" +
"<if test=\"asMessageChannel != null\">, as_message_channel=#{asMessageChannel}</if>" +
"<if test=\"broadcastPushAfterAck != null\">, broadcast_push_after_ack=#{broadcastPushAfterAck}</if>" +
"<if test=\"geoCoordSys != null\">, geo_coord_sys=#{geoCoordSys}</if>" +
"<if test=\"switchPrimarySubStream != null\">, switch_primary_sub_stream=#{switchPrimarySubStream}</if>" +
"<if test=\"mediaServerId != null\">, media_server_id=#{mediaServerId}</if>" +
@ -264,6 +271,7 @@ public interface DeviceMapper {
"charset,"+
"ssrc_check,"+
"as_message_channel,"+
"broadcastPushAfterAck,"+
"geo_coord_sys,"+
"on_line,"+
"media_server_id,"+
@ -278,6 +286,7 @@ public interface DeviceMapper {
"#{charset}," +
"#{ssrcCheck}," +
"#{asMessageChannel}," +
"#{broadcastPushAfterAck}," +
"#{geoCoordSys}," +
"#{onLine}," +
"#{mediaServerId}," +

Binary file not shown.

View File

@ -70,6 +70,7 @@
<el-form-item label="其他选项">
<el-checkbox label="SSRC校验" v-model="form.ssrcCheck" style="float: left"></el-checkbox>
<el-checkbox label="作为消息通道" v-model="form.asMessageChannel" style="float: left"></el-checkbox>
<el-checkbox label="收到ACK后发流" v-model="form.broadcastPushAfterAck" style="float: left"></el-checkbox>
</el-form-item>
<el-form-item>
<div style="float: right;">