合并分支

pull/1450/head
648540858 2024-04-25 17:53:24 +08:00
parent b89af9a693
commit 39c3274294
12 changed files with 115 additions and 444 deletions

View File

@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,9 +44,6 @@ public class RedisRpcConfig implements MessageListener {
@Autowired @Autowired
private RedisTemplate<Object, Object> redisTemplate; private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor") @Qualifier("taskExecutor")

View File

@ -15,9 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@ -121,22 +119,20 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId()); logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
// 查询这路流是否是本平台的
StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
if (push!= null && !push.isSelf()) {
// 不是本平台的就发送redis消息让其他wvp停止发流 // 不是本平台的就发送redis消息让其他wvp停止发流
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
if (platform != null) { if (platform != null) {
RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
} redisRpcService.stopSendRtp(sendRtpItem.getRedisKey());
redisCatchStorage.deleteSendRTPServer(null, null, sendRtpItem.getCallId(), null);
}else { }else {
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), redisCatchStorage.deleteSendRTPServer(null, null, callIdHeader.getCallId(), null);
callIdHeader.getCallId(), null); mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) { if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrc());
}
} }
}else { }else {
logger.info("[上级平台停止观看] 未找到平台{}的信息发送redis消息失败", sendRtpItem.getPlatformId()); logger.info("[上级平台停止观看] 未找到平台{}的信息发送redis消息失败", sendRtpItem.getPlatformId());

View File

@ -19,12 +19,12 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IInviteStreamService;
@ -34,7 +34,6 @@ import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.*;
@ -491,7 +490,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
String startTimeStr = DateUtil.urlFormatter.format(start); String startTimeStr = DateUtil.urlFormatter.format(start);
String endTimeStr = DateUtil.urlFormatter.format(end); String endTimeStr = DateUtil.urlFormatter.format(end);
String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.isHasAudio(), false, device.getStreamModeForParam()); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.getHasAudio(), false, device.getStreamModeForParam());
sendRtpItem.setStream(stream); sendRtpItem.setStream(stream);
// 写入redis 超时时回复 // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
@ -521,7 +520,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, device.getStreamModeForParam()); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.getHasAudio(), false, device.getStreamModeForParam());
sendRtpItem.setStream(ssrcInfo.getStream()); sendRtpItem.setStream(ssrcInfo.getStream());
// 写入redis 超时时回复 // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
@ -596,11 +595,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 从redis查询是否正在接收这个推流 // 从redis查询是否正在接收这个推流
StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
if (pushListItem != null) { if (pushListItem != null) {
sendRtpItem.setServerId(pushListItem.getSeverId()); sendRtpItem.setServerId(pushListItem.getServerId());
sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
StreamPushItem transform = streamPushService.transform(pushListItem); StreamPushItem transform = streamPushService.transform(pushListItem);
transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); transform.setSelf(userSetting.getServerId().equals(pushListItem.getServerId()));
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
// 开始推流 // 开始推流
sendPushStream(sendRtpItem, mediaServerItem, platform, request); sendPushStream(sendRtpItem, mediaServerItem, platform, request);
@ -657,9 +656,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
/** /**
* *
*/ */
private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { private void sendProxyStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) {
Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady != null && streamReady) {
if (mediaInfo != null) {
// 自平台内容 // 自平台内容
int localPort = sendRtpPortManager.getNextPort(mediaServerItem); int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@ -677,7 +677,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setStatus(1); sendRtpItem.setStatus(1);
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt); SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform);
if (response != null) { if (response != null) {
sendRtpItem.setToTag(response.getToTag()); sendRtpItem.setToTag(response.getToTag());
} }
@ -685,11 +685,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
} }
private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { private void sendPushStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) {
// 推流 // 推流
if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady != null && streamReady) { if (mediaInfo != null ) {
// 自平台内容 // 自平台内容
int localPort = sendRtpPortManager.getNextPort(mediaServerItem); int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
if (localPort == 0) { if (localPort == 0) {
@ -703,7 +703,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
// 写入redis 超时时回复 // 写入redis 超时时回复
sendRtpItem.setStatus(1); sendRtpItem.setStatus(1);
SIPResponse response = sendStreamAck(request, sendRtpItem, platform); SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform);
if (response != null) { if (response != null) {
sendRtpItem.setToTag(response.getToTag()); sendRtpItem.setToTag(response.getToTag());
} }
@ -726,20 +726,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
/** /**
* 线 * 线
*/ */
private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) {
// TODO 控制启用以使设备上线 // TODO 控制启用以使设备上线
logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
// 监听流上线 // 监听流上线
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), mediaServerItem.getId());
zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { hookSubscribe.addSubscribe(hook, (hookData)->{
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; logger.info("[上级点播]拉流代理已经就绪, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
dynamicTask.stop(sendRtpItem.getCallId()); dynamicTask.stop(sendRtpItem.getCallId());
sendProxyStream(sendRtpItem, mediaServerItem, platform, request); sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
}); });
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); hookSubscribe.removeSubscribe(hook);
}, userSetting.getPlatformPlayTimeout()); }, userSetting.getPlatformPlayTimeout());
boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
if (!start) { if (!start) {
@ -748,7 +747,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
} }
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); hookSubscribe.removeSubscribe(hook);
dynamicTask.stop(sendRtpItem.getCallId()); dynamicTask.stop(sendRtpItem.getCallId());
} }
} }
@ -756,7 +755,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
/** /**
* 线 * 线
*/ */
private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) {
// 发送redis消息以使设备上线流上线后被 // 发送redis消息以使设备上线流上线后被
logger.info("[ app={}, stream={} ]通道未推流发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); logger.info("[ app={}, stream={} ]通道未推流发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
@ -860,7 +859,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
} }
public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { public SIPResponse sendStreamAck(MediaServer mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) {
String sdpIp = sendRtpItem.getLocalIp(); String sdpIp = sendRtpItem.getLocalIp();
if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
@ -1006,7 +1005,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}, {}", requesterId, addressStr, port, gb28181Sdp.getSsrc(), logger.info("设备{}请求语音流,地址:{}:{}ssrc{}, {}", requesterId, addressStr, port, gb28181Sdp.getSsrc(),
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP"); mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP");
MediaServer mediaServerItem = broadcastCatch.getMediaServerItem(); MediaServer mediaServerItem = broadcastCatch.getMediaServer();
if (mediaServerItem == null) { if (mediaServerItem == null) {
logger.warn("未找到语音喊话使用的zlm"); logger.warn("未找到语音喊话使用的zlm");
try { try {

View File

@ -166,7 +166,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
}catch (Exception e) { }catch (Exception e) {
logger.error("[向上级转发移动位置失败] ", e); logger.error("[向上级转发移动位置失败] ", e);
} }
if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId())) {
List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
channels.forEach(channel -> { channels.forEach(channel -> {
// 发送redis消息。 通知位置信息的变化 // 发送redis消息。 通知位置信息的变化

View File

@ -147,4 +147,6 @@ public interface IMediaServerService {
SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp); String app, String stream, String channelId, boolean tcp, boolean rtcp);
MediaServer getMediaServerByAppAndStream(String app, String stream);
} }

View File

@ -898,4 +898,16 @@ public class MediaServerServiceImpl implements IMediaServerService {
sendRtpItem.setRtcp(rtcp); sendRtpItem.setRtcp(rtcp);
return sendRtpItem; return sendRtpItem;
} }
@Override
public MediaServer getMediaServerByAppAndStream(String app, String stream) {
List<MediaServer> mediaServerList = getAll();
for (MediaServer mediaServer : mediaServerList) {
MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream);
if (mediaInfo != null) {
return mediaServer;
}
}
return null;
}
} }

View File

@ -5,25 +5,22 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.*; import com.genersoft.iot.vmp.media.event.media.*;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent;
import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent;
import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@ -180,54 +177,6 @@ public class ZLMHttpHookListener {
if (mediaServer == null) { if (mediaServer == null) {
return new HookResultForOnPublish(0, "success"); return new HookResultForOnPublish(0, "success");
} }
// 推流鉴权的处理
if (!"rtp".equals(param.getApp())) {
StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (stream != null) {
HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
result.setEnable_audio(stream.isEnableAudio());
result.setEnable_mp4(stream.isEnableMp4());
return result;
}
if (userSetting.getPushAuthority()) {
// 推流鉴权
if (param.getParams() == null) {
logger.info("推流鉴权失败: 缺少必要参数sign=md5(user表的pushKey)");
return new HookResultForOnPublish(401, "Unauthorized");
}
Map<String, String> paramMap = urlParamToMap(param.getParams());
String sign = paramMap.get("sign");
if (sign == null) {
logger.info("推流鉴权失败: 缺少必要参数sign=md5(user表的pushKey)");
return new HookResultForOnPublish(401, "Unauthorized");
}
// 推流自定义播放鉴权码
String callId = paramMap.get("callId");
// 鉴权配置
boolean hasAuthority = userService.checkPushAuthority(callId, sign);
if (!hasAuthority) {
logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
return new HookResultForOnPublish(401, "Unauthorized");
}
StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
streamAuthorityInfo.setCallId(callId);
streamAuthorityInfo.setSign(sign);
// 鉴权通过
redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
}
} else {
zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
}
HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
result.setEnable_audio(true);
taskExecutor.execute(() -> {
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) {
subscribe.response(mediaInfo, param);
}
});
ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams()); ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
if (resultForOnPublish != null) { if (resultForOnPublish != null) {
@ -264,220 +213,6 @@ public class ZLMHttpHookListener {
applicationEventPublisher.publishEvent(mediaDepartureEvent); applicationEventPublisher.publishEvent(mediaDepartureEvent);
} }
JSONObject json = (JSONObject) JSON.toJSON(param);
taskExecutor.execute(() -> {
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
if (mediaInfo == null) {
logger.info("[ZLM HOOK] 流变化未找到ZLM, {}", param.getMediaServerId());
return;
}
if (subscribe != null) {
subscribe.response(mediaInfo, param);
}
List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
// TODO 重构此处逻辑
if (param.isRegist()) {
// 处理流注册的鉴权信息, 流注销这里不再删除鉴权信息,下次来了新的鉴权信息会对就的进行覆盖
if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
if (streamAuthorityInfo == null) {
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
} else {
streamAuthorityInfo.setOriginType(param.getOriginType());
streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr());
}
redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
}
}
if ("rtsp".equals(param.getSchema())) {
logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
if (param.isRegist()) {
mediaServerService.addCount(param.getMediaServerId());
} else {
mediaServerService.removeCount(param.getMediaServerId());
}
int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
if (updateStatusResult > 0) {
}
if ("rtp".equals(param.getApp()) && !param.isRegist()) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
inviteStreamService.removeInviteInfo(inviteInfo);
storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
}
} else if ("broadcast".equals(param.getApp())) {
// 语音对讲推流 stream需要满足格式deviceId_channelId
if (param.getStream().indexOf("_") > 0) {
String[] streamArray = param.getStream().split("_");
if (streamArray.length == 2) {
String deviceId = streamArray[0];
String channelId = streamArray[1];
Device device = deviceService.getDevice(deviceId);
if (device != null) {
if (param.isRegist()) {
if (audioBroadcastManager.exit(deviceId, channelId)) {
playService.stopAudioBroadcast(deviceId, channelId);
}
// 开启语音对讲通道
try {
playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg) -> {
logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
});
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 语音对讲: {}", e.getMessage());
}
} else {
// 流注销
playService.stopAudioBroadcast(deviceId, channelId);
}
} else {
logger.info("[语音对讲] 未找到设备:{}", deviceId);
}
}
}
} else if ("talk".equals(param.getApp())) {
// 语音对讲推流 stream需要满足格式deviceId_channelId
if (param.getStream().indexOf("_") > 0) {
String[] streamArray = param.getStream().split("_");
if (streamArray.length == 2) {
String deviceId = streamArray[0];
String channelId = streamArray[1];
Device device = deviceService.getDevice(deviceId);
if (device != null) {
if (param.isRegist()) {
if (audioBroadcastManager.exit(deviceId, channelId)) {
playService.stopAudioBroadcast(deviceId, channelId);
}
// 开启语音对讲通道
playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg) -> {
logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
});
} else {
// 流注销
playService.stopTalk(device, channelId, param.isRegist());
}
} else {
logger.info("[语音对讲] 未找到设备:{}", deviceId);
}
}
}
} else {
if (!"rtp".equals(param.getApp())) {
String type = OriginType.values()[param.getOriginType()].getType();
if (param.isRegist()) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(
param.getApp(), param.getStream());
String callId = null;
if (streamAuthorityInfo != null) {
callId = streamAuthorityInfo.getCallId();
}
StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo,
param.getApp(), param.getStream(), tracks, callId);
param.setStreamInfo(new StreamContent(streamInfoByAppAndStream));
redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param);
if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
param.setSeverId(userSetting.getServerId());
zlmMediaListManager.addPush(param);
// 冗余数据,自己系统中自用
redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
}
} else {
// 兼容流注销时类型从redis记录获取
OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
param.getApp(), param.getStream(), param.getMediaServerId());
if (onStreamChangedHookParam != null) {
type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream());
if ("PUSH".equalsIgnoreCase(type)) {
// 冗余数据,自己系统中自用
redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId());
}
}
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) {
// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
}
zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
}
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) {
if (userSetting.isUsePushingAsStatus()) {
eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF);
}
}
if (type != null) {
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", param.getApp());
jsonObject.put("stream", param.getStream());
jsonObject.put("register", param.isRegist());
jsonObject.put("mediaServerId", param.getMediaServerId());
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
}
}
}
if (!param.isRegist()) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem == null) {
continue;
}
if (sendRtpItem.getApp().equals(param.getApp())) {
logger.info(sendRtpItem.toString());
if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
// 通知其他wvp停止发流
redisCatchStorage.sendPushStreamClose(messageForPushChannel);
}else {
String platformId = sendRtpItem.getPlatformId();
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
Device device = deviceService.getDevice(platformId);
try {
if (platform != null) {
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
} else {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
|| sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null) {
// 来自上级平台的停止对讲
logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
}
}
}
} catch (SipException | InvalidArgumentException | ParseException |
SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
}
}
}
}
}
}
}
});
return HookResult.SUCCESS(); return HookResult.SUCCESS();
} }
@ -491,63 +226,6 @@ public class ZLMHttpHookListener {
logger.info("[ZLM HOOK]流无人观看:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), logger.info("[ZLM HOOK]流无人观看:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
param.getApp(), param.getStream()); param.getApp(), param.getStream());
JSONObject ret = new JSONObject(); JSONObject ret = new JSONObject();
ret.put("code", 0);
// 国标类型的流
if ("rtp".equals(param.getApp())) {
ret.put("close", userSetting.getStreamOnDemand());
// 国标流, 点播/录像回放/录像下载
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
// 点播
if (inviteInfo != null) {
// 录像下载
if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
ret.put("close", false);
return ret;
}
// 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
inviteInfo.getChannelId());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
try {
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}
}
}
}
Device device = deviceService.getDevice(inviteInfo.getDeviceId());
if (device != null) {
try {
// 多查询一次防止已经被处理了
InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (info != null) {
cmder.streamByeCmd(device, inviteInfo.getChannelId(),
inviteInfo.getStream(), null);
} else {
logger.info("[无人观看] 未找到设备的点播信息: {} 流:{}", inviteInfo.getDeviceId(), param.getStream());
}
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
}
} else {
logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), param.getStream());
}
boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema()); boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema());
ret.put("code", close); ret.put("code", close);
return ret; return ret;
@ -609,22 +287,16 @@ public class ZLMHttpHookListener {
if (!"rtp".equals(param.getApp())) { if (!"rtp".equals(param.getApp())) {
return HookResult.SUCCESS(); return HookResult.SUCCESS();
} }
taskExecutor.execute(() -> {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
try { try {
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); MediaSendRtpStoppedEvent event = new MediaSendRtpStoppedEvent(this);
} catch (SipException | InvalidArgumentException | ParseException e) { MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); if (mediaServerItem != null) {
event.setMediaServer(mediaServerItem);
applicationEventPublisher.publishEvent(event);
} }
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), }catch (Exception e) {
sendRtpItem.getCallId(), sendRtpItem.getStream()); logger.info("[ZLM-HOOK-rtp发送关闭] 发送通知失败 ", e);
} }
}
});
return HookResult.SUCCESS(); return HookResult.SUCCESS();
} }

View File

@ -131,7 +131,11 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
param.put("ssrc", ssrc); param.put("ssrc", ssrc);
} }
JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param); JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param);
return (jsonObject != null && jsonObject.getInteger("code") == 0); if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
logger.error("停止发流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}
return true;
} }

View File

@ -265,7 +265,7 @@ public class ZLMServerFactory {
return result; return result;
} }
public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) { public JSONObject stopSendRtpStream(MediaServer mediaServerItem, SendRtpItem sendRtpItem) {
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__"); param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp()); param.put("app", sendRtpItem.getApp());

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
@ -10,14 +11,13 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -56,11 +56,7 @@ public class RedisRpcController {
private UserSetting userSetting; private UserSetting userSetting;
@Autowired @Autowired
private ZlmHttpHookSubscribe hookSubscribe; private HookSubscribe hookSubscribe;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired @Autowired
private RedisTemplate<Object, Object> redisTemplate; private RedisTemplate<Object, Object> redisTemplate;
@ -88,7 +84,7 @@ public class RedisRpcController {
} }
logger.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); logger.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流 // 查询本级是否有这个流
MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem == null) { if (mediaServerItem == null) {
RedisRpcResponse response = request.getResponse(); RedisRpcResponse response = request.getResponse();
response.setStatusCode(200); response.setStatusCode(200);
@ -124,17 +120,17 @@ public class RedisRpcController {
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
logger.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); logger.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流 // 查询本级是否有这个流
MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); MediaServer mediaServer = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem != null) { if (mediaServer != null) {
logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
// 读取redis中的上级点播信息生成sendRtpItm发送出去 // 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) { if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式 // 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServer.getId()) : ssrcFactory.getPlayBackSsrc(mediaServer.getId());
sendRtpItem.setSsrc(ssrc); sendRtpItem.setSsrc(ssrc);
} }
sendRtpItem.setMediaServerId(mediaServerItem.getId()); sendRtpItem.setMediaServerId(mediaServer.getId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); sendRtpItem.setLocalIp(mediaServer.getSdpIp());
sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setServerId(userSetting.getServerId());
redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
@ -143,19 +139,17 @@ public class RedisRpcController {
response.setStatusCode(200); response.setStatusCode(200);
} }
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null); hookSubscribe.addSubscribe(hook, (hookData) -> {
hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 读取redis中的上级点播信息生成sendRtpItm发送出去 // 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) { if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式 // 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId());
sendRtpItem.setSsrc(ssrc); sendRtpItem.setSsrc(ssrc);
} }
sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); sendRtpItem.setMediaServerId(hookData.getMediaServer().getId());
sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());
sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setServerId(userSetting.getServerId());
redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
@ -177,8 +171,7 @@ public class RedisRpcController {
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class); SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
hookSubscribe.removeSubscribe(hook); hookSubscribe.removeSubscribe(hook);
RedisRpcResponse response = request.getResponse(); RedisRpcResponse response = request.getResponse();
response.setStatusCode(200); response.setStatusCode(200);
@ -201,31 +194,31 @@ public class RedisRpcController {
return response; return response;
} }
logger.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); logger.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServerItem == null) { if (mediaServer == null) {
logger.info("[redis-rpc] startSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() ); logger.info("[redis-rpc] startSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult); response.setBody(wvpResult);
return response; return response;
} }
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (mediaInfo != null) {
if (!streamReady) {
logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线"); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
response.setBody(wvpResult); response.setBody(wvpResult);
return response; return response;
} }
JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); try {
if (jsonObject.getInteger("code") == 0) { mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
}catch (ControllerException exception) {
logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}{} {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg());
response.setBody(wvpResult);
return response;
}
logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
WVPResult wvpResult = WVPResult.success(); WVPResult wvpResult = WVPResult.success();
response.setBody(wvpResult); response.setBody(wvpResult);
}else {
logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}{} {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), jsonObject);
WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg"));
response.setBody(wvpResult);
}
return response; return response;
} }
@ -244,25 +237,24 @@ public class RedisRpcController {
return response; return response;
} }
logger.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); logger.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServerItem == null) { if (mediaServer == null) {
logger.info("[redis-rpc] stopSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() ); logger.info("[redis-rpc] stopSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult); response.setBody(wvpResult);
return response; return response;
} }
JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem); try {
if (jsonObject.getInteger("code") == 0) { mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
}catch (ControllerException exception) {
logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}{} code {}, msg: {}", sendRtpItem.getApp(),
sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() );
response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg()));
return response;
}
logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
response.setBody(WVPResult.success()); response.setBody(WVPResult.success());
return response; return response;
}else {
int code = jsonObject.getInteger("code");
String msg = jsonObject.getString("msg");
logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}{} code {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg );
response.setBody(WVPResult.fail(code, msg));
return response;
}
} }
/** /**

View File

@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.storager;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;

View File

@ -357,7 +357,8 @@ public interface DeviceChannelMapper {
"<if test='item.gpsTime != null'>, gps_time=#{item.gpsTime}</if>" + "<if test='item.gpsTime != null'>, gps_time=#{item.gpsTime}</if>" +
"<if test='item.streamIdentification != null'>, stream_identification=#{item.streamIdentification}</if>" + "<if test='item.streamIdentification != null'>, stream_identification=#{item.streamIdentification}</if>" +
"<if test='item.id > 0'>WHERE id=#{item.id}</if>" + "<if test='item.id > 0'>WHERE id=#{item.id}</if>" +
"<if test='item.id == 0'>WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}</if>" + "<if test='item.id == 0 and item.channelId != null '>WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}</if>" +
"<if test='item.id == 0 and item.channelId == null '>WHERE device_id=#{item.deviceId}</if>" +
"</foreach>" + "</foreach>" +
"</script>"}) "</script>"})
int batchUpdate(List<DeviceChannel> updateChannels); int batchUpdate(List<DeviceChannel> updateChannels);