修复国标视频点播三种级联并发点播和录像下载的问题

pull/845/head
648540858 2023-05-08 17:18:08 +08:00
parent 490c55381f
commit c429a34e53
20 changed files with 517 additions and 675 deletions

View File

@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -109,7 +108,7 @@ public interface ISIPCommander {
* @param startTime ,yyyy-MM-dd HH:mm:ss
* @param endTime ,yyyy-MM-dd HH:mm:ss
*/
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,InviteStreamCallback inviteStreamCallback, InviteStreamCallback event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/**
*
@ -121,7 +120,7 @@ public interface ISIPCommander {
* @param downloadSpeed
*/
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
String startTime, String endTime, int downloadSpeed, ZlmHttpHookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
/**

View File

@ -366,11 +366,11 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
String startTime, String endTime, ZlmHttpHookSubscribe.Event hookEvent,
SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getSdpIp(), mediaServerItem.getIp(), ssrcInfo.getPort());
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
String sdpIp;
if (!ObjectUtils.isEmpty(device.getSdpIp())) {
sdpIp = device.getSdpIp();
@ -443,8 +443,7 @@ public class SIPCommander implements ISIPCommander {
// 添加订阅
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
if (hookEvent != null) {
InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream());
hookEvent.call(inviteStreamInfo);
hookEvent.response(mediaServerItemInUse, json);
}
subscribe.removeSubscribe(hookSubscribe);
});
@ -456,9 +455,6 @@ public class SIPCommander implements ISIPCommander {
streamSession.put(device.getDeviceId(), channelId,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
okEvent.response(event);
});
if (inviteStreamCallback != null) {
inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream()));
}
}
/**
@ -473,10 +469,10 @@ public class SIPCommander implements ISIPCommander {
@Override
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed,
InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
ZlmHttpHookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getSdpIp(), mediaServerItem.getIp(), ssrcInfo.getPort());
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
String sdpIp;
if (!ObjectUtils.isEmpty(device.getSdpIp())) {
sdpIp = device.getSdpIp();
@ -550,7 +546,7 @@ public class SIPCommander implements ISIPCommander {
String callId= newCallIdHeader.getCallId();
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
logger.debug("sipc 添加订阅===callId {}",callId);
hookEvent.call(new InviteStreamInfo(mediaServerItem, json,callId, "rtp", ssrcInfo.getStream()));
hookEvent.response(mediaServerItemInUse, json);
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("regist", false);
hookSubscribe.getContent().put("schema", "rtsp");
@ -568,9 +564,6 @@ public class SIPCommander implements ISIPCommander {
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
if (inviteStreamCallback != null) {
inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,callId, "rtp", ssrcInfo.getStream()));
}
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;

View File

@ -356,7 +356,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}else {
streamTypeStr = "UDP";
}
logger.info("[上级点播] 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", username, channelId, addressStr, port, streamTypeStr, ssrc);
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
@ -380,7 +380,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
InviteErrorCallback<Object> hookEvent = (code, msg, data) -> {
StreamInfo streamInfo = (StreamInfo)data;
MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId());
logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP) {}/{}", streamInfo.getApp(), streamInfo.getStream());
logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP) {}/{}", streamInfo.getApp(), streamInfo.getStream());
// * 0 等待设备推流上来
// * 1 下级已经推流等待上级平台回复ack
// * 2 推流中
@ -443,22 +443,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
DateUtil.formatter.format(end), null, result -> {
if (result.getCode() != 0) {
logger.warn("录像回放失败");
if (result.getEvent() != null) {
// errorEvent.response(result.getEvent());
}
DateUtil.formatter.format(end),
(code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()){
hookEvent.run(code, msg, data);
}else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
logger.info("[录像回放]超时, 用户:{} 通道:{}", username, channelId);
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
try {
responseAck(request, Response.REQUEST_TIMEOUT);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 录像回放 发送REQUEST_TIMEOUT: {}", e.getMessage());
}
errorEvent.run(code, msg, data);
}else {
if (result.getMediaServerItem() != null) {
// hookEvent.response(result.getMediaServerItem(), result.getResponse());
}
errorEvent.run(code, msg, data);
}
});
} else {
@ -477,6 +471,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
logger.info("[上级点播]超时, 用户:{} 通道:{}", username, channelId);
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
errorEvent.run(code, msg, data);
}else {
errorEvent.run(code, msg, data);
}

View File

@ -80,7 +80,6 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
public void process(RequestEvent evt) {
try {
RequestEventExt evtExt = (RequestEventExt) evt;
String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
SIPRequest request = (SIPRequest)evt.getRequest();
Response response = null;
@ -91,12 +90,13 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
String deviceId = uri.getUser();
logger.info("[注册请求] 设备:{}, 开始处理: {}", deviceId, requestAddress);
Device device = deviceService.getDevice(deviceId);
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request,
userSetting.getSipUseSourceIpAsRemoteAddress());
logger.info("[注册请求] 设备:{}, 远程地址为: {}:{}", deviceId, remoteAddressInfo.getIp(), remoteAddressInfo.getPort());
String requestAddress = remoteAddressInfo.getIp() + ":" + remoteAddressInfo.getPort();
logger.info("[注册请求] 设备:{}, 开始处理: {}", deviceId, requestAddress);
if (device != null &&
device.getSipTransactionInfo() != null &&
request.getCallIdHeader().getCallId().equals(device.getSipTransactionInfo().getCallId())) {

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.info;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@ -9,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@ -17,10 +19,12 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.*;
import javax.sip.header.CallIdHeader;
import javax.sip.header.ContentTypeHeader;
import javax.sip.message.Response;
import java.text.ParseException;
@ -43,6 +47,9 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IInviteStreamService inviteStreamService;
@Autowired
private IVideoManagerStorage storager;
@ -103,13 +110,14 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
String streamId = sendRtpItem.getStreamId();
StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo) {
responseAck(request, Response.NOT_FOUND, "stream " + streamId + " not found");
return;
}
Device device1 = storager.queryVideoDevice(streamInfo.getDeviceID());
cmder.playbackControlCmd(device1,streamInfo,new String(evt.getRequest().getRawContent()),eventResult -> {
Device device1 = storager.queryVideoDevice(inviteInfo.getDeviceId());
if (inviteInfo.getStreamInfo() != null) {
cmder.playbackControlCmd(device1,inviteInfo.getStreamInfo(),new String(evt.getRequest().getRawContent()),eventResult -> {
// 失败的回复
try {
responseAck(request, eventResult.statusCode, eventResult.msg);
@ -125,6 +133,8 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
}
});
}
}
}
} catch (SipException e) {
logger.warn("SIP 回复错误", e);

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
@ -15,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@ -64,6 +66,12 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
@Autowired
private ZlmHttpHookSubscribe subscribe;
@Autowired
private IInviteStreamService inviteStreamService;
@Autowired
private VideoStreamSessionManager streamSession;
@Override
public void afterPropertiesSet() throws Exception {
notifyMessageHandler.addHandler(cmdType, this);
@ -82,17 +90,15 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
String NotifyType =getText(rootElement, "NotifyType");
if ("121".equals(NotifyType)){
logger.info("[录像流]推送完毕,收到关流通知");
// 查询是设备
StreamInfo streamInfo = redisCatchStorage.queryDownload(null, null, null, callIdHeader.getCallId());
if (streamInfo != null) {
// 设置进度100%
streamInfo.setProgress(1);
redisCatchStorage.startDownload(streamInfo, callIdHeader.getCallId());
}
// 先从会话内查找
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
if (ssrcTransaction != null) { // 兼容海康 媒体通知 消息from字段不是设备ID的问题
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
if (ssrcTransaction != null) {
logger.info("[录像流]推送完毕,关流通知, device: {}, channelId: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
if (inviteInfo.getStreamInfo() != null) {
inviteInfo.getStreamInfo().setProgress(1);
inviteStreamService.updateInviteInfo(inviteInfo);
}
try {
cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), null, callIdHeader.getCallId());
@ -117,6 +123,8 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
logger.error("[命令发送失败] 国标级联 录像播放完毕: {}", e.getMessage());
}
}
}else {
logger.info("[录像流]推送完毕,关流通知, 但是未找到对应的下载信息");
}
}
}

View File

@ -451,6 +451,11 @@ public class ZLMHttpHookListener {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
// 点播
if (inviteInfo != null) {
// 录像下载
if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
ret.put("close", false);
return ret;
}
// 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
@ -486,36 +491,6 @@ public class ZLMHttpHookListener {
storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
return ret;
}
// 录像回放
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null,
param.getStream(), null);
if (streamInfoForPlayBackCatch != null) {
if (streamInfoForPlayBackCatch.isPause()) {
ret.put("close", false);
} else {
Device device = deviceService.getDevice(streamInfoForPlayBackCatch.getDeviceID());
if (device != null) {
try {
cmder.streamByeCmd(device, streamInfoForPlayBackCatch.getChannelId(),
streamInfoForPlayBackCatch.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage());
}
}
redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(),
streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null);
}
return ret;
}
// 录像下载
StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null,
param.getStream(), null);
// 进行录像下载时无人观看不断流
if (streamInfoForDownload != null) {
ret.put("close", false);
return ret;
}
} else {
// 非国标流 推流/拉流代理
// 拉流代理

View File

@ -293,11 +293,14 @@ public class ZLMRTPServerFactory {
if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, JSONObject response)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
keepPort(serverItem, ssrc);
int port = keepPort(serverItem, ssrc);
if (port == 0) {
logger.info("[上级点播] {}->监听端口失败,移除监听", ssrc);
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
}
});
logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
}else {

View File

@ -60,4 +60,9 @@ public interface IInviteStreamService {
* invite
*/
void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data);
/**
* invite
*/
void clearInviteInfo(String deviceId);
}

View File

@ -3,10 +3,8 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import javax.sip.InvalidArgumentException;
@ -29,13 +27,13 @@ public interface IPlayService {
*/
MediaServerItem getNewMediaServerItemHasAssist(Device device);
void playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback);
void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
void playBack(String deviceId, String channelId, String startTime, String endTime, InviteErrorCallback<Object> callback);
void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, InviteErrorCallback<Object> callback);
void zlmServerOffline(String mediaServerId);
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback);
void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback);
void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback);
StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);

View File

@ -5,7 +5,7 @@ package com.genersoft.iot.vmp.service.bean;
*/
public enum InviteErrorCode {
SUCCESS(0, "成功"),
ERROR_FOR_SIGNALLING_TIMEOUT(-1, "点播超时"),
ERROR_FOR_SIGNALLING_TIMEOUT(-1, "信令超时"),
ERROR_FOR_STREAM_TIMEOUT(-2, "收流超时"),
ERROR_FOR_RESOURCE_EXHAUSTION(-3, "资源耗尽"),
ERROR_FOR_CATCH_DATA(-4, "缓存数据异常"),
@ -14,7 +14,9 @@ public enum InviteErrorCode {
ERROR_FOR_SDP_PARSING_EXCEPTIONS(-7, "SDP信息解析失败"),
ERROR_FOR_SSRC_UNAVAILABLE(-8, "SSRC不可用"),
ERROR_FOR_RESET_SSRC(-9, "重新设置收流信息失败"),
ERROR_FOR_SIP_SENDING_FAILED(-10, "命令发送失败");
ERROR_FOR_SIP_SENDING_FAILED(-10, "命令发送失败"),
ERROR_FOR_ASSIST_NOT_READY(-11, "没有可用的assist服务"),
ERROR_FOR_PARAMETER_ERROR(-13, "参数异常");
private final int code;
private final String msg;

View File

@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
@ -58,6 +59,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IInviteStreamService inviteStreamService;
@Autowired
private DeviceMapper deviceMapper;
@ -97,7 +101,7 @@ public class DeviceServiceImpl implements IDeviceService {
String now = DateUtil.getNow();
if (deviceInRedis != null && deviceInDb == null) {
// redis 存在脏数据
redisCatchStorage.clearCatchByDeviceId(device.getDeviceId());
inviteStreamService.clearInviteInfo(device.getDeviceId());
}
device.setUpdateTime(now);
if (device.getKeepaliveIntervalTime() == 0) {

View File

@ -175,4 +175,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
}
@Override
public void clearInviteInfo(String deviceId) {
removeInviteInfo(null, deviceId, null, null);
}
}

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
@ -17,7 +16,6 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
@ -28,12 +26,13 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -182,6 +181,12 @@ public class PlayServiceImpl implements IPlayService {
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
InviteErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
null);
return;
}
logger.info("[点播开始] deviceId: {}, channelId: {},收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
//端口获取失败的ssrcInfo 没有必要发送点播指令
@ -375,11 +380,10 @@ public class PlayServiceImpl implements IPlayService {
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
logger.warn("[停止点播] {}/{}", device.getDeviceId(), channelId);
logger.warn("[点播] 更新ssrc失败停止点播 {}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
dynamicTask.stop(timeOutTaskKey);
@ -459,11 +463,12 @@ public class PlayServiceImpl implements IPlayService {
}
private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, PlayBackCallback playBackCallback) {
private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>();
if (streamInfo != null) {
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
deviceChannel.setStreamId(streamInfo.getStream());
@ -472,20 +477,13 @@ public class PlayServiceImpl implements IPlayService {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
if (inviteInfo != null) {
inviteInfo.setStatus(InviteSessionStatus.ok);
inviteInfo.setStreamInfo(streamInfo);
inviteStreamService.updateInviteInfo(inviteInfo);
}
playBackResult.setCode(ErrorCode.SUCCESS.getCode());
playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
playBackResult.setData(streamInfo);
playBackCallback.call(playBackResult);
} else {
logger.warn("录像回放调用失败!");
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg("录像回放调用失败!");
playBackCallback.call(playBackResult);
}
return streamInfo;
}
@Override
@ -524,23 +522,24 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void playBack(String deviceId, String channelId, String startTime,
String endTime, InviteStreamCallback inviteStreamCallback,
PlayBackCallback callback) {
String endTime, InviteErrorCallback<Object> callback) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;
}
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
}
@Override
public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime,
String endTime, InviteStreamCallback infoCallBack,
PlayBackCallback playBackCallback) {
String endTime, InviteErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
null);
return;
}
@ -548,62 +547,61 @@ public class PlayServiceImpl implements IPlayService {
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
}
logger.info("[回放消息] deviceId: {}, channelId: {},收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>();
logger.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
String playBackTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(playBackTimeOutTaskKey, () -> {
logger.warn(String.format("设备回放超时deviceId%s channelId%s", deviceId, channelId));
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg("回放超时");
logger.warn("[录像回放] 超时deviceId{} channelId{}", deviceId, channelId);
inviteStreamService.removeInviteInfo(inviteInfo);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[录像流]回放超时 发送BYE失败 {}", e.getMessage());
logger.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
// 回复之前所有的点播请求
playBackCallback.call(playBackResult);
}, userSetting.getPlayTimeout());
SipSubscribe.Event errorEvent = event -> {
logger.info("[录像回放] 失败,{} {}", event.statusCode, event.msg);
dynamicTask.stop(playBackTimeOutTaskKey);
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
playBackResult.setEvent(event);
playBackCallback.call(playBackResult);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null);
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
};
InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> {
logger.info("收到回放订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
logger.info("收到回放订阅消息: " + jsonObject);
dynamicTask.stop(playBackTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
if (streamInfo == null) {
logger.warn("设备回放API调用失败");
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg("设备回放API调用失败");
playBackCallback.call(playBackResult);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
playBackResult.setCode(ErrorCode.SUCCESS.getCode());
playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
playBackResult.setData(streamInfo);
playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
playBackResult.setResponse(inviteStreamInfo.getResponse());
playBackCallback.call(playBackResult);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
logger.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
};
try {
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
hookEvent, eventResult -> {
if (eventResult.type == SipSubscribe.EventResultType.response) {
inviteInfo.setStatus(InviteSessionStatus.ok);
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
@ -614,21 +612,56 @@ public class PlayServiceImpl implements IPlayService {
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
String substring = contentString.substring(0, contentString.indexOf("y="));
try {
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
int port = -1;
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
for (Object description : mediaDescriptions) {
MediaDescription mediaDescription = (MediaDescription) description;
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("96")) {
port = media.getMediaPort();
break;
}
}
logger.info("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
logger.info("[录像回放-TCP主动连接对方] 结果: {}", jsonObject);
} catch (SdpException e) {
logger.error("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
dynamicTask.stop(playBackTimeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
}
}
return;
}
logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
// ssrc 不可用
logger.info("[录像回放] SSRC修正时发现ssrc不可使用 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
dynamicTask.stop(playBackTimeOutTaskKey);
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";
eventResult.statusCode = 400;
errorEvent.response(eventResult);
callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
return;
}
@ -637,44 +670,46 @@ public class PlayServiceImpl implements IPlayService {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
hookSubscribe.getContent().put("stream", stream);
inviteInfo.setStream(stream);
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(playBackTimeOutTaskKey);
// hook响应
onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, playBackCallback);
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
hookEvent.response(mediaServerItemInUse, response);
});
}
// 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{
if (result) {
// 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort(), true, device.getStreamModeForParam());
}else {
// 更新ssrc
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
logger.warn("[回放消息]停止 {}/{}", device.getDeviceId(), channelId);
logger.warn("[录像回放] 更新ssrc失败停止录像回放 {}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止点播 停止 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
logger.error("[命令发送失败] 停止点播 发送BYE: {}", e.getMessage());
}
dynamicTask.stop(playBackTimeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
errorEvent.response(eventResult);
eventResult.msg = "下级自定义了ssrc,重新设置收流信息失败";
eventResult.statusCode = 500;
errorEvent.response(eventResult);
}
});
}
}
}
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}else {
ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
}
}else {
logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
}
}
inviteStreamService.updateInviteInfo(inviteInfo);
}, errorEvent);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 回放: {}", e.getMessage());
@ -690,42 +725,50 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) {
public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;
}
MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
if (newMediaServerItem == null) {
PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>();
downloadResult.setCode(ErrorCode.ERROR100.getCode());
downloadResult.setMsg("未找到assist服务");
playBackCallback.call(downloadResult);
callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
null);
return;
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, infoCallBack, playBackCallback);
download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
}
@Override
public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
null);
return;
}
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "设备:" + deviceId + "不存在");
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
"设备:" + deviceId + "不存在",
null);
return;
}
PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>();
logger.info("[录像下载] deviceId: {}, channelId: {},收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
logger.warn(String.format("录像下载请求超时deviceId%s channelId%s", deviceId, channelId));
downloadResult.setCode(ErrorCode.ERROR100.getCode());
downloadResult.setMsg("录像下载请求超时");
hookCallBack.call(downloadResult);
inviteStreamService.removeInviteInfo(inviteInfo);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
@ -741,31 +784,28 @@ public class PlayServiceImpl implements IPlayService {
SipSubscribe.Event errorEvent = event -> {
dynamicTask.stop(downLoadTimeOutTaskKey);
downloadResult.setCode(ErrorCode.ERROR100.getCode());
downloadResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
downloadResult.setEvent(event);
hookCallBack.call(downloadResult);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
};
InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> {
logger.info("[录像下载]收到订阅消息: " + inviteStreamInfo.getCallId());
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
logger.info("[录像下载]收到订阅消息: " + jsonObject);
dynamicTask.stop(downLoadTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
downloadResult.setCode(ErrorCode.SUCCESS.getCode());
downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
downloadResult.setData(streamInfo);
downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
downloadResult.setResponse(inviteStreamInfo.getResponse());
hookCallBack.call(downloadResult);
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
if (streamInfo == null) {
logger.warn("[录像下载] 获取流地址信息失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
};
try {
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
hookEvent, errorEvent, eventResult ->
{
if (eventResult.type == SipSubscribe.EventResultType.response) {
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
hookEvent, errorEvent, eventResult ->{
inviteInfo.setStatus(InviteSessionStatus.ok);
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
@ -776,6 +816,41 @@ public class PlayServiceImpl implements IPlayService {
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
String substring = contentString.substring(0, contentString.indexOf("y="));
try {
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
int port = -1;
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
for (Object description : mediaDescriptions) {
MediaDescription mediaDescription = (MediaDescription) description;
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("96")) {
port = media.getMediaPort();
break;
}
}
logger.info("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
logger.info("[录像下载-TCP主动连接对方] 结果: {}", jsonObject);
} catch (SdpException e) {
logger.error("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
dynamicTask.stop(downLoadTimeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
}
}
return;
}
logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
@ -787,9 +862,8 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";
eventResult.statusCode = 400;
errorEvent.response(eventResult);
callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
return;
}
@ -802,24 +876,19 @@ public class PlayServiceImpl implements IPlayService {
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(downLoadTimeOutTaskKey);
// hook响应
onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, hookCallBack);
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
hookEvent.response(mediaServerItemInUse, response);
});
}
// 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{
if (result) {
// 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort(), true, device.getStreamModeForParam());
}else {
// 更新ssrc
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
logger.warn("[录像下载] 停止{}/{}", device.getDeviceId(), channelId);
logger.warn("[录像下载] 更新ssrc失败停止录像回放 {}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 录像下载停止, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
logger.error("[命令发送失败] 停止点播 发送BYE: {}", e.getMessage());
}
dynamicTask.stop(downLoadTimeOutTaskKey);
@ -827,12 +896,17 @@ public class PlayServiceImpl implements IPlayService {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,重新设置收流信息失败";
eventResult.statusCode = 500;
errorEvent.response(eventResult);
}
});
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}else {
ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
}
}else {
logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
}
}
});
@ -849,21 +923,22 @@ public class PlayServiceImpl implements IPlayService {
@Override
public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
StreamInfo streamInfo = redisCatchStorage.queryDownload(deviceId, channelId, stream, null);
if (streamInfo != null) {
if (streamInfo.getProgress() == 1) {
return streamInfo;
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
if (inviteInfo.getStreamInfo().getProgress() == 1) {
return inviteInfo.getStreamInfo();
}
// 获取当前已下载时长
String mediaServerId = streamInfo.getMediaServerId();
String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
logger.warn("查询录像信息时发现节点已离线");
return null;
}
if (mediaServerItem.getRecordAssistPort() > 0) {
JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
if (jsonObject == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
}
@ -871,10 +946,10 @@ public class PlayServiceImpl implements IPlayService {
long duration = jsonObject.getLong("data");
if (duration == 0) {
streamInfo.setProgress(0);
inviteInfo.getStreamInfo().setProgress(0);
} else {
String startTime = streamInfo.getStartTime();
String endTime = streamInfo.getEndTime();
String startTime = inviteInfo.getStreamInfo().getStartTime();
String endTime = inviteInfo.getStreamInfo().getEndTime();
long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
@ -882,30 +957,31 @@ public class PlayServiceImpl implements IPlayService {
BigDecimal totalCount = new BigDecimal(end - start);
BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
double process = divide.doubleValue();
streamInfo.setProgress(process);
inviteInfo.getStreamInfo().setProgress(process);
}
inviteStreamService.updateInviteInfo(inviteInfo);
}
}
return inviteInfo.getStreamInfo();
}
return null;
}
private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, response, deviceId, channelId);
if (streamInfo != null) {
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.DOWNLOAD, deviceId, channelId);
if (inviteInfo != null) {
inviteInfo.setStatus(InviteSessionStatus.ok);
inviteInfo.setStreamInfo(streamInfo);
inviteStreamService.updateInviteInfo(inviteInfo);
}
}
return streamInfo;
}
private void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage();
msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
msg.setId(uuid);
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
if (streamInfo != null) {
redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
msg.setData(JSON.toJSONString(streamInfo));
resultHolder.invokeResult(msg);
} else {
logger.warn("设备预览API调用失败");
msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败"));
resultHolder.invokeResult(msg);
}
}
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
String streamId = resonse.getString("stream");
@ -1007,15 +1083,14 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
String key = redisCatchStorage.queryPlaybackForKey(null, null, streamId, null);
StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
logger.warn("streamId不存在!");
throw new ServiceException("streamId不存在");
}
streamInfo.setPause(true);
redisTemplate.opsForValue().set(key, streamInfo);
MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId());
inviteInfo.getStreamInfo().setPause(true);
inviteStreamService.updateInviteInfo(inviteInfo);
MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
if (null == mediaServerItem) {
logger.warn("mediaServer 不存在!");
throw new ServiceException("mediaServer不存在");
@ -1025,21 +1100,20 @@ public class PlayServiceImpl implements IPlayService {
if (jsonObject == null || jsonObject.getInteger("code") != 0) {
throw new ServiceException("暂停RTP接收失败");
}
Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
cmder.playPauseCmd(device, streamInfo);
Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
cmder.playPauseCmd(device, inviteInfo.getStreamInfo());
}
@Override
public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
String key = redisCatchStorage.queryPlaybackForKey(null, null, streamId, null);
StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
logger.warn("streamId不存在!");
throw new ServiceException("streamId不存在");
}
streamInfo.setPause(false);
redisTemplate.opsForValue().set(key, streamInfo);
MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId());
inviteInfo.getStreamInfo().setPause(false);
inviteStreamService.updateInviteInfo(inviteInfo);
MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
if (null == mediaServerItem) {
logger.warn("mediaServer 不存在!");
throw new ServiceException("mediaServer不存在");
@ -1049,7 +1123,7 @@ public class PlayServiceImpl implements IPlayService {
if (jsonObject == null || jsonObject.getInteger("code") != 0) {
throw new ServiceException("继续RTP接收失败");
}
Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
cmder.playResumeCmd(device, streamInfo);
Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
cmder.playResumeCmd(device, inviteInfo.getStreamInfo());
}
}

View File

@ -1,14 +1,16 @@
package com.genersoft.iot.vmp.storager;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import java.util.List;
@ -23,14 +25,6 @@ public interface IRedisCatchStorage {
*/
Long getCSEQ();
boolean startPlayback(StreamInfo stream, String callId);
boolean stopPlayback(String deviceId, String channelId, String stream, String callId);
StreamInfo queryPlayback(String deviceId, String channelID, String stream, String callId);
String queryPlaybackForKey(String deviceId, String channelId, String stream, String callId);
void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch);
ParentPlatformCatch queryPlatformCatchInfo(String platformGbId);
@ -47,8 +41,6 @@ public interface IRedisCatchStorage {
void delPlatformRegisterInfo(String callId);
void cleanPlatformRegisterInfos();
void updateSendRTPSever(SendRtpItem sendRtpItem);
/**
@ -74,12 +66,6 @@ public interface IRedisCatchStorage {
*/
boolean isChannelSendingRTP(String channelId);
/**
*
* @param deviceId ID
*/
void clearCatchByDeviceId(String deviceId);
/**
* rediswvp
*/
@ -120,23 +106,6 @@ public interface IRedisCatchStorage {
*/
void removeStream(String mediaServerId, String type);
/**
*
* @param streamInfo
*/
boolean startDownload(StreamInfo streamInfo, String callId);
StreamInfo queryDownload(String deviceId, String channelId, String stream, String callId);
boolean stopDownload(String deviceId, String channelId, String stream, String callId);
/**
*
* @param queryKey
* @return
*/
ThirdPartyGB queryMemberNoGBId(String queryKey);
List<OnStreamChangedHookParam> getStreams(String mediaServerId, String pull);
/**

View File

@ -2,17 +2,18 @@ package com.genersoft.iot.vmp.storager.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
@ -92,160 +93,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
}
@Override
public boolean startPlayback(StreamInfo stream, String callId) {
redisTemplate.opsForValue().set(String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream);
return true;
}
@Override
public boolean startDownload(StreamInfo stream, String callId) {
String key=String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId);
if (stream.getProgress() == 1) {
logger.debug("添加下载缓存==已完成下载=》{}",key);
redisTemplate.opsForValue().set(key, stream);
}else {
logger.debug("添加下载缓存==未完成下载=》{}",key);
Duration duration = Duration.ofSeconds(60*60L);
redisTemplate.opsForValue().set(key, stream, duration);
}
return true;
}
@Override
public boolean stopDownload(String deviceId, String channelId, String stream, String callId) {
DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
deviceChannel.setStreamId(null);
deviceChannel.setDeviceId(deviceId);
deviceChannelMapper.update(deviceChannel);
}
if (deviceId == null) {
deviceId = "*";
}
if (channelId == null) {
channelId = "*";
}
if (stream == null) {
stream = "*";
}
if (callId == null) {
callId = "*";
}
String key = String.format("%S_%s_*_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
userSetting.getServerId(),
deviceId,
channelId,
stream,
callId
);
List<Object> scan = RedisUtil.scan(redisTemplate, key);
if (scan.size() > 0) {
for (Object keyObj : scan) {
redisTemplate.delete(keyObj);
}
}
return true;
}
@Override
public boolean stopPlayback(String deviceId, String channelId, String stream, String callId) {
DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
deviceChannel.setStreamId(null);
deviceChannel.setDeviceId(deviceId);
deviceChannelMapper.update(deviceChannel);
}
if (deviceId == null) {
deviceId = "*";
}
if (channelId == null) {
channelId = "*";
}
if (stream == null) {
stream = "*";
}
if (callId == null) {
callId = "*";
}
String key = String.format("%S_%s_*_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetting.getServerId(),
deviceId,
channelId,
stream,
callId
);
List<Object> scan = RedisUtil.scan(redisTemplate, key);
if (scan.size() > 0) {
for (Object keyObj : scan) {
redisTemplate.delete(keyObj);
}
}
return true;
}
@Override
public StreamInfo queryPlayback(String deviceId, String channelId, String stream, String callId) {
if (stream == null && callId == null) {
return null;
}
if (deviceId == null) {
deviceId = "*";
}
if (channelId == null) {
channelId = "*";
}
if (stream == null) {
stream = "*";
}
if (callId == null) {
callId = "*";
}
String key = String.format("%S_%s_*_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetting.getServerId(),
deviceId,
channelId,
stream,
callId
);
List<Object> streamInfoScan = RedisUtil.scan(redisTemplate, key);
if (streamInfoScan.size() > 0) {
return (StreamInfo) redisTemplate.opsForValue().get(streamInfoScan.get(0));
}else {
return null;
}
}
@Override
public String queryPlaybackForKey(String deviceId, String channelId, String stream, String callId) {
if (stream == null && callId == null) {
return null;
}
if (deviceId == null) {
deviceId = "*";
}
if (channelId == null) {
channelId = "*";
}
if (stream == null) {
stream = "*";
}
if (callId == null) {
callId = "*";
}
String key = String.format("%S_%s_*_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetting.getServerId(),
deviceId,
channelId,
stream,
callId
);
List<Object> streamInfoScan = RedisUtil.scan(redisTemplate, key);
return (String) streamInfoScan.get(0);
}
@Override
public void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch) {
String key = VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + parentPlatformCatch.getId();
@ -291,14 +138,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
redisTemplate.delete(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId);
}
@Override
public void cleanPlatformRegisterInfos() {
List regInfos = RedisUtil.scan(redisTemplate, VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + "*");
for (Object key : regInfos) {
redisTemplate.delete(key.toString());
}
}
@Override
public void updateSendRTPSever(SendRtpItem sendRtpItem) {
@ -455,36 +294,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return RtpStreams.size() > 0;
}
@Override
public void clearCatchByDeviceId(String deviceId) {
List<Object> playLeys = RedisUtil.scan(redisTemplate, String.format("%S_%s_*_%s_*", VideoManagerConstants.PLAYER_PREFIX,
userSetting.getServerId(),
deviceId));
if (playLeys.size() > 0) {
for (Object key : playLeys) {
redisTemplate.delete(key.toString());
}
}
List<Object> playBackers = RedisUtil.scan(redisTemplate, String.format("%S_%s_*_%s_*_*_*", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetting.getServerId(),
deviceId));
if (playBackers.size() > 0) {
for (Object key : playBackers) {
redisTemplate.delete(key.toString());
}
}
List<Object> deviceCache = RedisUtil.scan(redisTemplate, String.format("%S%s_%s", VideoManagerConstants.DEVICE_PREFIX,
userSetting.getServerId(),
deviceId));
if (deviceCache.size() > 0) {
for (Object key : deviceCache) {
redisTemplate.delete(key.toString());
}
}
}
@Override
public void updateWVPInfo(JSONObject jsonObject, int time) {
String key = VideoManagerConstants.WVP_SERVER_PREFIX + userSetting.getServerId();
@ -516,44 +325,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
redisTemplate.delete(key);
}
@Override
public StreamInfo queryDownload(String deviceId, String channelId, String stream, String callId) {
if (stream == null && callId == null) {
return null;
}
if (deviceId == null) {
deviceId = "*";
}
if (channelId == null) {
channelId = "*";
}
if (stream == null) {
stream = "*";
}
if (callId == null) {
callId = "*";
}
String key = String.format("%S_%s_*_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
userSetting.getServerId(),
deviceId,
channelId,
stream,
callId
);
List<Object> streamInfoScan = RedisUtil.scan(redisTemplate, key);
if (streamInfoScan.size() > 0) {
return (StreamInfo) redisTemplate.opsForValue().get(streamInfoScan.get(0));
}else {
return null;
}
}
@Override
public ThirdPartyGB queryMemberNoGBId(String queryKey) {
String key = VideoManagerConstants.WVP_STREAM_GB_ID_PREFIX + queryKey;
return JsonUtil.redisJsonToObject(redisTemplate, key, ThirdPartyGB.class);
}
@Override
public void removeStream(String mediaServerId, String type) {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId;

View File

@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.BaseTree;
@ -63,6 +64,9 @@ public class DeviceQuery {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IInviteStreamService inviteStreamService;
@Autowired
private SIPCommander cmder;
@ -184,7 +188,7 @@ public class DeviceQuery {
// 清除redis记录
boolean isSuccess = deviceService.delete(deviceId);
if (isSuccess) {
redisCatchStorage.clearCatchByDeviceId(deviceId);
inviteStreamService.clearInviteInfo(deviceId);
// 停止此设备的订阅更新
Set<String> allKeys = dynamicTask.getAllKeys();
for (String key : allKeys) {

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.vmanager.gb28181.play;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
@ -116,7 +115,7 @@ public class PlayController {
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);
playService.play(newMediaServerItem, deviceId, channelId, ((code, msg, data) -> {
playService.play(newMediaServerItem, deviceId, channelId, (code, msg, data) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
@ -133,10 +132,9 @@ public class PlayController {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
System.out.println(JSON.toJSONString(wvpResult));
requestMessage.setData(wvpResult);
resultHolder.invokeResult(requestMessage);
}));
});
return result;
}

View File

@ -1,15 +1,22 @@
package com.genersoft.iot.vmp.vmanager.gb28181.playback;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@ -20,17 +27,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
@ -59,6 +62,9 @@ public class PlaybackController {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IInviteStreamService inviteStreamService;
@Autowired
private IPlayService playService;
@ -74,7 +80,7 @@ public class PlaybackController {
@Parameter(name = "startTime", description = "开始时间", required = true)
@Parameter(name = "endTime", description = "结束时间", required = true)
@GetMapping("/start/{deviceId}/{channelId}")
public DeferredResult<WVPResult<StreamContent>> start(@PathVariable String deviceId, @PathVariable String channelId,
public DeferredResult<WVPResult<StreamContent>> start(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId,
String startTime, String endTime) {
if (logger.isDebugEnabled()) {
@ -86,22 +92,31 @@ public class PlaybackController {
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
resultHolder.put(key, uuid, result);
RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key);
requestMessage.setId(uuid);
playService.playBack(deviceId, channelId, startTime, endTime,
(code, msg, data)->{
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setId(uuid);
playService.playBack(deviceId, channelId, startTime, endTime, null,
playBackResult->{
wvpResult.setCode(playBackResult.getCode());
wvpResult.setMsg(playBackResult.getMsg());
if (playBackResult.getCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = (StreamInfo)playBackResult.getData();
if (data != null) {
StreamInfo streamInfo = (StreamInfo)data;
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo.channgeStreamIp(request.getLocalName());
}
wvpResult.setData(new StreamContent(streamInfo));
}
msg.setData(wvpResult);
resultHolder.invokeResult(msg);
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
requestMessage.setData(wvpResult);
resultHolder.invokeResult(requestMessage);
});
return result;
@ -169,14 +184,15 @@ public class PlaybackController {
@GetMapping("/seek/{streamId}/{seekTime}")
public void playSeek(@PathVariable String streamId, @PathVariable long seekTime) {
logger.info("playSeek: "+streamId+", "+seekTime);
StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
logger.warn("streamId不存在!");
throw new ControllerException(ErrorCode.ERROR400.getCode(), "streamId不存在");
}
Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
try {
cmder.playSeekCmd(device, streamInfo, seekTime);
cmder.playSeekCmd(device, inviteInfo.getStreamInfo(), seekTime);
} catch (InvalidArgumentException | ParseException | SipException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), e.getMessage());
}
@ -188,8 +204,9 @@ public class PlaybackController {
@GetMapping("/speed/{streamId}/{speed}")
public void playSpeed(@PathVariable String streamId, @PathVariable Double speed) {
logger.info("playSpeed: "+streamId+", "+speed);
StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
logger.warn("streamId不存在!");
throw new ControllerException(ErrorCode.ERROR400.getCode(), "streamId不存在");
}
@ -197,9 +214,9 @@ public class PlaybackController {
logger.warn("不支持的speed " + speed);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "不支持的speed0.25 0.5 1、2、4");
}
Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
try {
cmder.playSpeedCmd(device, streamInfo, speed);
cmder.playSpeedCmd(device, inviteInfo.getStreamInfo(), speed);
} catch (InvalidArgumentException | ParseException | SipException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), e.getMessage());
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.vmanager.gb28181.record;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@ -10,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -27,6 +29,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
@ -55,8 +58,8 @@ public class GBRecordController {
@Autowired
private IDeviceService deviceService;
@Autowired
private UserSetting userSetting;
@Operation(summary = "录像查询")
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@ -119,7 +122,7 @@ public class GBRecordController {
@Parameter(name = "endTime", description = "结束时间", required = true)
@Parameter(name = "downloadSpeed", description = "下载倍速", required = true)
@GetMapping("/download/start/{deviceId}/{channelId}")
public DeferredResult<WVPResult<StreamContent>> download(@PathVariable String deviceId, @PathVariable String channelId,
public DeferredResult<WVPResult<StreamContent>> download(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId,
String startTime, String endTime, String downloadSpeed) {
if (logger.isDebugEnabled()) {
@ -130,22 +133,32 @@ public class GBRecordController {
String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(30000L);
resultHolder.put(key, uuid, result);
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
RequestMessage requestMessage = new RequestMessage();
requestMessage.setId(uuid);
requestMessage.setKey(key);
playService.download(deviceId, channelId, startTime, endTime, Integer.parseInt(downloadSpeed),
(code, msg, data)->{
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
playService.download(deviceId, channelId, startTime, endTime, Integer.parseInt(downloadSpeed), null, playBackResult->{
wvpResult.setCode(playBackResult.getCode());
wvpResult.setMsg(playBackResult.getMsg());
if (playBackResult.getCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = (StreamInfo)playBackResult.getData();
if (data != null) {
StreamInfo streamInfo = (StreamInfo)data;
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo.channgeStreamIp(request.getLocalName());
}
wvpResult.setData(new StreamContent(streamInfo));
}
msg.setData(wvpResult);
resultHolder.invokeResult(msg);
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
requestMessage.setData(wvpResult);
resultHolder.invokeResult(requestMessage);
});
return result;