合并主线

结构优化
648540858 2023-06-27 17:18:46 +08:00
parent 5a7a7a12bd
commit f961515317
7 changed files with 45 additions and 54 deletions

View File

@ -56,7 +56,7 @@ public class SIPRequestHeaderPlarformProvider {
//via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(),
Integer.parseInt(parentPlatform.getDevicePort()), parentPlatform.getTransport(), SipUtils.getNewViaTag());
parentPlatform.getDevicePort(), parentPlatform.getTransport(), SipUtils.getNewViaTag());
viaHeader.setRPort();
viaHeaders.add(viaHeader);
//from

View File

@ -6,9 +6,7 @@ import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@ -624,9 +622,9 @@ public class SIPCommander implements ISIPCommander {
logger.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort());
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
subscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, hookParam) -> {
if (event != null) {
event.response(mediaServerItemInUse, json);
event.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribeForStreamChange);
}
});
@ -634,9 +632,9 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
callIdHeader.setCallId(callId);
HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream, null, mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribeForStreamPush, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
subscribe.addSubscribe(hookSubscribeForStreamPush, (mediaServerItemInUse, hookParam) -> {
if (eventForPush != null) {
eventForPush.response(mediaServerItemInUse, json);
eventForPush.response(mediaServerItemInUse, hookParam);
}
});
//

View File

@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@ -899,9 +900,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
if (event != null) {
event.response(mediaServerItemInUse, json);
event.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribe);
}
});

View File

@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
@ -127,10 +128,9 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
// 消息发送成功, 向上级发送invite获取推流
try {
platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, response)->{
platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, hookParam)->{
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
// 上级平台推流成功
String app = response.getString("app");
String stream = response.getString("stream");
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId);
if (broadcastCatch != null ) {
if (playService.audioBroadcastInUse(device, targetId)) {
@ -138,24 +138,24 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
platform.getServerGBId(), deviceChannel.getChannelId());
// 查看语音通道已经建立且已经占用 回复BYE
try {
platformService.stopBroadcast(platform, deviceChannel.getChannelId(), stream);
platformService.stopBroadcast(platform, deviceChannel.getChannelId(), streamChangedHookParam.getStream());
} catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException |
SipException e) {
logger.info("[消息发送失败] 国标级联 语音喊话 platform {} channel: {}", platform.getServerGBId(), deviceChannel.getChannelId());
}
}else {
// 查看语音通道已经建立但是未占用
broadcastCatch.setApp(app);
broadcastCatch.setStream(stream);
broadcastCatch.setApp(streamChangedHookParam.getApp());
broadcastCatch.setStream(streamChangedHookParam.getStream());
broadcastCatch.setMediaServerItem(mediaServerItem);
audioBroadcastManager.update(broadcastCatch);
// 推流到设备
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, stream, null);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, streamChangedHookParam.getStream(), null);
if (sendRtpItem == null) {
logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, stream);
logger.info("[国标级联] 语音喊话 重新开始channelId: {}, stream: {}", targetId, stream);
logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream());
logger.info("[国标级联] 语音喊话 重新开始channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream());
try {
playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> {
playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> {
logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
@ -173,7 +173,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}else {
try {
playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> {
playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> {
logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId);
});
} catch (SipException | InvalidArgumentException | ParseException e) {

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@ -28,7 +29,7 @@ public interface IPlayService {
ErrorCallback<Object> callback);
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback);
StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId);
StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId);
MediaServerItem getNewMediaServerItem(Device device);

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
@ -16,6 +15,7 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
@ -442,10 +442,11 @@ public class PlatformServiceImpl implements IPlatformService {
inviteStreamService.removeInviteInfo(inviteInfo);
}else {
// 流确实尚在推流,直接回调结果
JSONObject json = new JSONObject();
json.put("app", inviteInfo.getStreamInfo().getApp());
json.put("stream", inviteInfo.getStreamInfo().getStream());
hookEvent.response(mediaServerItemForStreamInfo, json);
OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam();
hookParam.setApp(inviteInfo.getStreamInfo().getApp());
hookParam.setStream(inviteInfo.getStreamInfo().getStream());
hookEvent.response(mediaServerItemForStreamInfo, hookParam);
return;
}
}
@ -498,14 +499,14 @@ public class PlatformServiceImpl implements IPlatformService {
}
}
}, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{
logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
dynamicTask.stop(timeOutTaskKey);
// hook响应
playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);
playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId);
// 收到流
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
hookEvent.response(mediaServerItem, hookParam);
}
}, event -> {
// 收到200OK 检测ssrc是否有变化防止上级自定义了ssrc
@ -524,30 +525,20 @@ public class PlatformServiceImpl implements IPlatformService {
logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
if (!mediaServerItem.isRtpEnable()) {
logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) {
// ssrc 不可用
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
event.msg = "下级自定义了ssrc,但是此ssrc不可用";
event.statusCode = 400;
errorEvent.response(event);
return;
}
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
// 单端口模式streamId也有变化需要重新设置监听
if (!mediaServerItem.isRtpEnable()) {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
dynamicTask.stop(timeOutTaskKey);
// hook响应
playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId);
hookEvent.response(mediaServerItemInUse, response);
playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
hookEvent.response(mediaServerItemInUse, hookParam);
});
}
// 关闭rtp server

View File

@ -293,12 +293,12 @@ public class PlayServiceImpl implements IPlayService {
// 查看设备是否已经在推流
try {
cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("[语音对讲] 流已生成, 开始推流: " + response.toJSONString());
cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> {
logger.info("[语音对讲] 流已生成, 开始推流: " + hookParam);
dynamicTask.stop(timeOutTaskKey);
// TODO 暂不做处理
}, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
logger.info("[语音对讲] 设备开始推流: " + json.toJSONString());
}, (mediaServerItemInuse, hookParam) -> {
logger.info("[语音对讲] 设备开始推流: " + hookParam);
dynamicTask.stop(timeOutTaskKey);
}, (event) -> {
@ -617,10 +617,10 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
Device device = redisCatchStorage.getDevice(deviceId);
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
if (streamInfo != null) {
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
@ -1571,7 +1571,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> {
talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
logger.info("[语音对讲] 收到设备发来的流");
}, eventResult -> {
logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);