支持全局固定流地址

pull/993/head
648540858 2023-08-07 17:00:01 +08:00
parent fa13b22819
commit 470625e077
6 changed files with 88 additions and 62 deletions

View File

@ -459,7 +459,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setApp("rtp"); sendRtpItem.setApp("rtp");
if ("Playback".equalsIgnoreCase(sessionName)) { if ("Playback".equalsIgnoreCase(sessionName)) {
sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); String startTimeStr = DateUtil.urlFormatter.format(start);
String endTimeStr = DateUtil.urlFormatter.format(end);
String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
sendRtpItem.setStreamId(ssrcInfo.getStream()); sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复 // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
@ -520,12 +523,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
})); }));
sendRtpItem.setPlayType(InviteStreamType.PLAY); sendRtpItem.setPlayType(InviteStreamType.PLAY);
String streamId = null; String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}else {
streamId = String.format("%08x", Integer.parseInt(ssrcInfo.getSsrc())).toUpperCase();
}
sendRtpItem.setStreamId(streamId); sendRtpItem.setStreamId(streamId);
sendRtpItem.setSsrc(ssrcInfo.getSsrc()); sendRtpItem.setSsrc(ssrcInfo.getSsrc());
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);

View File

@ -193,7 +193,10 @@ public class ZLMHttpHookListener {
String mediaServerId = json.getString("mediaServerId"); String mediaServerId = json.getString("mediaServerId");
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo == null) {
return new HookResultForOnPublish(200, "success");
}
// 推流鉴权的处理
if (!"rtp".equals(param.getApp())) { if (!"rtp".equals(param.getApp())) {
if (userSetting.getPushAuthority()) { if (userSetting.getPushAuthority()) {
// 推流鉴权 // 推流鉴权
@ -245,11 +248,21 @@ public class ZLMHttpHookListener {
} }
}); });
// 是否录像
if ("rtp".equals(param.getApp())) { if ("rtp".equals(param.getApp())) {
result.setEnable_mp4(userSetting.getRecordSip()); result.setEnable_mp4(userSetting.getRecordSip());
} else { } else {
result.setEnable_mp4(userSetting.isRecordPushLive()); result.setEnable_mp4(userSetting.isRecordPushLive());
} }
// 替换流地址
if ("rtp".equals(param.getApp()) && !mediaInfo.isRtpEnable()) {
String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));;
InviteInfo inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
if (inviteInfo != null) {
result.setStream_replace(inviteInfo.getStream());
logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", param.getStream(), inviteInfo.getStream());
}
}
List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream()); List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream());
if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
String deviceId = ssrcTransactionForAll.get(0).getDeviceId(); String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
@ -562,7 +575,7 @@ public class ZLMHttpHookListener {
if ("rtp".equals(param.getApp())) { if ("rtp".equals(param.getApp())) {
String[] s = param.getStream().split("_"); String[] s = param.getStream().split("_");
if (!mediaInfo.isRtpEnable() || (s.length != 2 && s.length != 4)) { if ((s.length != 2 && s.length != 4)) {
defaultResult.setResult(HookResult.SUCCESS()); defaultResult.setResult(HookResult.SUCCESS());
return defaultResult; return defaultResult;
} }
@ -591,7 +604,6 @@ public class ZLMHttpHookListener {
result.onTimeout(() -> { result.onTimeout(() -> {
logger.info("[ZLM HOOK] 预览流自动点播, 等待超时"); logger.info("[ZLM HOOK] 预览流自动点播, 等待超时");
// 释放rtpserver
msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时"));
resultHolder.invokeResult(msg); resultHolder.invokeResult(msg);
}); });

View File

@ -6,6 +6,7 @@ public class HookResultForOnPublish extends HookResult{
private boolean enable_mp4; private boolean enable_mp4;
private int mp4_max_second; private int mp4_max_second;
private String mp4_save_path; private String mp4_save_path;
private String stream_replace;
public HookResultForOnPublish() { public HookResultForOnPublish() {
} }
@ -51,12 +52,21 @@ public class HookResultForOnPublish extends HookResult{
this.mp4_save_path = mp4_save_path; this.mp4_save_path = mp4_save_path;
} }
public String getStream_replace() {
return stream_replace;
}
public void setStream_replace(String stream_replace) {
this.stream_replace = stream_replace;
}
@Override @Override
public String toString() { public String toString() {
return "HookResultForOnPublish{" + return "HookResultForOnPublish{" +
"enable_audio=" + enable_audio + "enable_audio=" + enable_audio +
", enable_mp4=" + enable_mp4 + ", enable_mp4=" + enable_mp4 +
", mp4_max_second=" + mp4_max_second + ", mp4_max_second=" + mp4_max_second +
", stream_replace=" + stream_replace +
", mp4_save_path='" + mp4_save_path + '\'' + ", mp4_save_path='" + mp4_save_path + '\'' +
'}'; '}';
} }

View File

@ -74,5 +74,13 @@ public interface IInviteStreamService {
int getStreamInfoCount(String mediaServerId); int getStreamInfoCount(String mediaServerId);
/**
* MediaServer
*/
InviteInfo getInviteInfoBySSRC(String ssrc);
/**
* ssrc
*/
InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrcInResponse);
} }

View File

@ -80,7 +80,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
":" + inviteInfoForUpdate.getType() + ":" + inviteInfoForUpdate.getType() +
":" + inviteInfoForUpdate.getDeviceId() + ":" + inviteInfoForUpdate.getDeviceId() +
":" + inviteInfoForUpdate.getChannelId() + ":" + inviteInfoForUpdate.getChannelId() +
":" + inviteInfoForUpdate.getStream(); ":" + inviteInfoForUpdate.getStream()+
":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
redisTemplate.opsForValue().set(key, inviteInfoForUpdate); redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
} }
@ -96,7 +97,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
":" + inviteInfo.getType() + ":" + inviteInfo.getType() +
":" + inviteInfo.getDeviceId() + ":" + inviteInfo.getDeviceId() +
":" + inviteInfo.getChannelId() + ":" + inviteInfo.getChannelId() +
":" + stream; ":" + stream +
":" + inviteInfo.getSsrcInfo().getSsrc();
inviteInfoInDb.setStream(stream); inviteInfoInDb.setStream(stream);
if (inviteInfoInDb.getSsrcInfo() != null) { if (inviteInfoInDb.getSsrcInfo() != null) {
inviteInfoInDb.getSsrcInfo().setStream(stream); inviteInfoInDb.getSsrcInfo().setStream(stream);
@ -111,7 +113,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
":" + (type != null ? type : "*") + ":" + (type != null ? type : "*") +
":" + (deviceId != null ? deviceId : "*") + ":" + (deviceId != null ? deviceId : "*") +
":" + (channelId != null ? channelId : "*") + ":" + (channelId != null ? channelId : "*") +
":" + (stream != null ? stream : "*"); ":" + (stream != null ? stream : "*")
+ ":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key); List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() != 1) { if (scanResult.size() != 1) {
return null; return null;
@ -136,7 +139,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
":" + (type != null ? type : "*") + ":" + (type != null ? type : "*") +
":" + (deviceId != null ? deviceId : "*") + ":" + (deviceId != null ? deviceId : "*") +
":" + (channelId != null ? channelId : "*") + ":" + (channelId != null ? channelId : "*") +
":" + (stream != null ? stream : "*"); ":" + (stream != null ? stream : "*") +
":*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey); List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
if (scanResult.size() > 0) { if (scanResult.size() > 0) {
for (Object keyObj : scanResult) { for (Object keyObj : scanResult) {
@ -191,7 +195,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
@Override @Override
public int getStreamInfoCount(String mediaServerId) { public int getStreamInfoCount(String mediaServerId) {
int count = 0; int count = 0;
String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*"; String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key); List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() == 0) { if (scanResult.size() == 0) {
return 0; return 0;
@ -229,4 +233,35 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
} }
return key; return key;
} }
@Override
public InviteInfo getInviteInfoBySSRC(String ssrc) {
String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() != 1) {
return null;
}
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
}
@Override
public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInDb == null) {
return null;
}
removeInviteInfo(inviteInfoInDb);
String key = VideoManagerConstants.INVITE_PREFIX +
":" + inviteInfo.getType() +
":" + inviteInfo.getDeviceId() +
":" + inviteInfo.getChannelId() +
":" + inviteInfo.getStream() +
":" + inviteInfo.getSsrcInfo().getSsrc();
if (inviteInfoInDb.getSsrcInfo() != null) {
inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
}
redisTemplate.opsForValue().set(key, inviteInfoInDb);
return inviteInfoInDb;
}
} }

View File

@ -158,10 +158,7 @@ public class PlayServiceImpl implements IPlayService {
} }
} }
} }
String streamId = null; String streamId = String.format("%s_%s", device.getDeviceId(), channelId);;
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam()); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
if (ssrcInfo == null) { if (ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
@ -457,16 +454,13 @@ public class PlayServiceImpl implements IPlayService {
logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
} }
String stream = null; String startTimeStr = startTime.replace("-", "")
if (newMediaServerItem.isRtpEnable()) { .replace(":", "")
String startTimeStr = startTime.replace("-", "") .replace(" ", "");
.replace(":", "") String endTimeTimeStr = endTime.replace("-", "")
.replace(" ", ""); .replace(":", "")
String endTimeTimeStr = endTime.replace("-", "") .replace(" ", "");
.replace(":", "") String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
.replace(" ", "");
stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback); playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
} }
@ -628,44 +622,13 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcInResponse != null) { if (ssrcInResponse != null) {
// 单端口 // 单端口
// 重新订阅流上线 // 重新订阅流上线
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp",
ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(), SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), null, inviteInfo.getStream()); inviteInfo.getChannelId(), null, inviteInfo.getStream());
streamSession.remove(inviteInfo.getDeviceId(), streamSession.remove(inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream()); inviteInfo.getChannelId(), inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
hookSubscribe.getContent().put("stream", stream);
inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
streamSession.put(device.getDeviceId(), channelId, ssrcTransaction.getCallId(), streamSession.put(device.getDeviceId(), channelId, ssrcTransaction.getCallId(),
stream, ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType); inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.info("[Invite 200OK] ssrc修正后收到订阅消息 " + hookParam);
dynamicTask.stop(timeOutTaskKey);
subscribe.removeSubscribe(hookSubscribe);
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId);
if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(), streamInfo);
inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
if (inviteSessionType == InviteSessionType.PLAY) {
snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream);
}
});
} }
} }
} }