优化下级平台自定义ssrc的情况,优化国标录像下载流程

pull/829/head
648540858 2023-04-14 14:59:22 +08:00
parent 1f02cb9178
commit d46fc9de82
9 changed files with 141 additions and 20 deletions

View File

@ -0,0 +1,5 @@
package com.genersoft.iot.vmp.common;
public interface CommonCallback<T>{
public void run(T t);
}

View File

@ -546,7 +546,7 @@ public class SIPCommander implements ISIPCommander {
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId()); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
// 添加订阅 // 添加订阅
CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
String callId=newCallIdHeader.getCallId(); String callId= newCallIdHeader.getCallId();
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
logger.debug("sipc 添加订阅===callId {}",callId); logger.debug("sipc 添加订阅===callId {}",callId);
hookEvent.call(new InviteStreamInfo(mediaServerItem, json,callId, "rtp", ssrcInfo.getStream())); hookEvent.call(new InviteStreamInfo(mediaServerItem, json,callId, "rtp", ssrcInfo.getStream()));
@ -558,7 +558,7 @@ public class SIPCommander implements ISIPCommander {
(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> { (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> {
logger.info("[录像]下载结束, 发送BYE"); logger.info("[录像]下载结束, 发送BYE");
try { try {
streamByeCmd(device, channelId, ssrcInfo.getStream(),callId); streamByeCmd(device, channelId, ssrcInfo.getStream(), callId);
} catch (InvalidArgumentException | ParseException | SipException | } catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) { SsrcTransactionNotFoundException e) {
logger.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage()); logger.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage());
@ -580,8 +580,6 @@ public class SIPCommander implements ISIPCommander {
if (ssrcIndex >= 0) { if (ssrcIndex >= 0) {
ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
} }
logger.debug("接收到的下载响应ssrc====>{}",ssrcInfo.getSsrc());
logger.debug("接收到的下载响应ssrc====>{}",ssrc);
streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download); streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download);
okEvent.response(event); okEvent.response(event);
}); });

View File

@ -12,6 +12,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
@ -58,6 +61,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
@Autowired @Autowired
private VideoStreamSessionManager sessionManager; private VideoStreamSessionManager sessionManager;
@Autowired
private ZlmHttpHookSubscribe subscribe;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this); notifyMessageHandler.addHandler(cmdType, this);
@ -93,6 +99,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
} catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) {
logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage()); logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage());
} }
// 去除监听流注销自动停止下载的监听
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcTransaction.getStream(), false, "rtsp", ssrcTransaction.getMediaServerId());
subscribe.removeSubscribe(hookSubscribe);
// 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定 // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null);

View File

@ -276,6 +276,10 @@ public class ZLMRESTfulUtils {
return sendPost(mediaServerItem, "closeRtpServer",param, null); return sendPost(mediaServerItem, "closeRtpServer",param, null);
} }
public void closeRtpServer(MediaServerItem mediaServerItem, Map<String, Object> param, RequestCallback callback) {
sendPost(mediaServerItem, "closeRtpServer",param, callback);
}
public JSONObject listRtpServer(MediaServerItem mediaServerItem) { public JSONObject listRtpServer(MediaServerItem mediaServerItem) {
return sendPost(mediaServerItem, "listRtpServer",null, null); return sendPost(mediaServerItem, "listRtpServer",null, null);
} }

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.*;
@ -164,6 +165,31 @@ public class ZLMRTPServerFactory {
return result; return result;
} }
public void closeRtpServer(MediaServerItem serverItem, String streamId, CommonCallback<Boolean> callback) {
if (serverItem == null) {
callback.run(false);
return;
}
Map<String, Object> param = new HashMap<>();
param.put("stream_id", streamId);
zlmresTfulUtils.closeRtpServer(serverItem, param, jsonObject -> {
if (jsonObject != null ) {
if (jsonObject.getInteger("code") == 0) {
callback.run(jsonObject.getInteger("hit") == 1);
return;
}else {
logger.error("关闭RTP Server 失败: " + jsonObject.getString("msg"));
}
}else {
// 检查ZLM状态
logger.error("关闭RTP Server 失败: 请检查ZLM服务");
}
callback.run(false);
});
}
/** /**
* *

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service; package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
@ -51,6 +52,8 @@ public interface IMediaServerService {
void closeRTPServer(MediaServerItem mediaServerItem, String streamId); void closeRTPServer(MediaServerItem mediaServerItem, String streamId);
void closeRTPServer(MediaServerItem mediaServerItem, String streamId, CommonCallback<Boolean> callback);
void closeRTPServer(String mediaServerId, String streamId); void closeRTPServer(String mediaServerId, String streamId);
void clearRTPServer(MediaServerItem mediaServerItem); void clearRTPServer(MediaServerItem mediaServerItem);

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
@ -172,6 +173,15 @@ public class MediaServerServiceImpl implements IMediaServerService {
zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId); zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId);
} }
@Override
public void closeRTPServer(MediaServerItem mediaServerItem, String streamId, CommonCallback<Boolean> callback) {
if (mediaServerItem == null) {
callback.run(false);
return;
}
zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId, callback);
}
@Override @Override
public void closeRTPServer(String mediaServerId, String streamId) { public void closeRTPServer(String mediaServerId, String streamId) {
MediaServerItem mediaServerItem = this.getOne(mediaServerId); MediaServerItem mediaServerItem = this.getOne(mediaServerId);

View File

@ -328,9 +328,30 @@ public class PlayServiceImpl implements IPlayService {
}); });
} }
// 关闭rtp server // 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{
// 重新开启ssrc server if (result) {
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort()); // 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort());
}else {
try {
logger.warn("[停止点播] {}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
dynamicTask.stop(timeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
event.msg = "下级自定义了ssrc,重新设置收流信息失败";
event.statusCode = 500;
errorEvent.response(event);
}
});
} }
} }
@ -472,7 +493,7 @@ public class PlayServiceImpl implements IPlayService {
if (device == null) { if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
} }
logger.info("[回放消息] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>(); PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>();
String playBackTimeOutTaskKey = UUID.randomUUID().toString(); String playBackTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(playBackTimeOutTaskKey, () -> { dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
@ -546,6 +567,7 @@ public class PlayServiceImpl implements IPlayService {
if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
// ssrc 不可用 // ssrc 不可用
// 释放ssrc // 释放ssrc
dynamicTask.stop(playBackTimeOutTaskKey);
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用"; eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";
@ -568,10 +590,31 @@ public class PlayServiceImpl implements IPlayService {
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
}); });
} }
// 关闭rtp server // 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{
// 重新开启ssrc server if (result) {
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); // 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
}else {
try {
logger.warn("[回放消息]停止 {}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止点播 停止, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
dynamicTask.stop(playBackTimeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
errorEvent.response(eventResult);
eventResult.msg = "下级自定义了ssrc,重新设置收流信息失败";
eventResult.statusCode = 500;
errorEvent.response(eventResult);
}
});
} }
} }
} }
@ -619,7 +662,7 @@ public class PlayServiceImpl implements IPlayService {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "设备:" + deviceId + "不存在"); throw new ControllerException(ErrorCode.ERROR400.getCode(), "设备:" + deviceId + "不存在");
} }
PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>(); PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>();
logger.info("[录像下载] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
String downLoadTimeOutTaskKey = UUID.randomUUID().toString(); String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> { dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
logger.warn(String.format("录像下载请求超时deviceId%s channelId%s", deviceId, channelId)); logger.warn(String.format("录像下载请求超时deviceId%s channelId%s", deviceId, channelId));
@ -648,7 +691,7 @@ public class PlayServiceImpl implements IPlayService {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
}; };
InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> { InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> {
logger.info("收到订阅消息: " + inviteStreamInfo.getCallId()); logger.info("[录像下载]收到订阅消息: " + inviteStreamInfo.getCallId());
dynamicTask.stop(downLoadTimeOutTaskKey); dynamicTask.stop(downLoadTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId); StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
streamInfo.setStartTime(startTime); streamInfo.setStartTime(startTime);
@ -678,9 +721,9 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
return; return;
} }
logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
// ssrc 不可用 // ssrc 不可用
@ -707,14 +750,34 @@ public class PlayServiceImpl implements IPlayService {
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
}); });
} }
// 关闭rtp server // 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{
// 重新开启ssrc server if (result) {
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); // 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
}else {
try {
logger.warn("[录像下载] 停止{}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 录像下载停止, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
dynamicTask.stop(downLoadTimeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,重新设置收流信息失败";
eventResult.statusCode = 500;
errorEvent.response(eventResult);
}
});
} }
} }
} }
}); });
} catch (InvalidArgumentException | SipException | ParseException e) { } catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 录像下载: {}", e.getMessage()); logger.error("[命令发送失败] 录像下载: {}", e.getMessage());

View File

@ -96,7 +96,10 @@ export default {
}); });
}, },
close: function (){ close: function (){
this.stopDownloadRecord(); if (this.streamInfo.progress < 1) {
this.stopDownloadRecord();
}
if (this.timer !== null) { if (this.timer !== null) {
window.clearTimeout(this.timer); window.clearTimeout(this.timer);
this.timer = null; this.timer = null;