From e2ebe9d6d87b1f3b6b47288ec660133ec1ef3a92 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 26 Mar 2024 11:42:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B5=81=E5=8F=98=E5=8C=96?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../session/AudioBroadcastManager.java | 60 +++++ .../service/impl/MediaServerServiceImpl.java | 6 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 215 +----------------- .../service/impl/InviteStreamServiceImpl.java | 8 +- .../vmp/service/impl/PlatformServiceImpl.java | 44 +++- .../iot/vmp/service/impl/PlayServiceImpl.java | 7 +- .../service/impl/StreamProxyServiceImpl.java | 2 +- .../service/impl/StreamPushServiceImpl.java | 41 +++- 8 files changed, 151 insertions(+), 232 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index 343d2a6e..bc5f0223 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -1,11 +1,26 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; +import com.genersoft.iot.vmp.service.IDeviceService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -18,11 +33,56 @@ import java.util.stream.Stream; @Component public class AudioBroadcastManager { + private final static Logger logger = LoggerFactory.getLogger(AudioBroadcastManager.class); + @Autowired private SipConfig config; + @Autowired + private SIPCommander cmder; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IDeviceService deviceService; + public static Map data = new ConcurrentHashMap<>(); + /** + * 流离开的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); + if (!sendRtpItems.isEmpty()) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { + String platformId = sendRtpItem.getPlatformId(); + Device device = deviceService.getDevice(platformId); + try { + if (device != null) { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + AudioBroadcastCatch audioBroadcastCatch = get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + // 来自上级平台的停止对讲 + logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } + } + } + } catch (SipException | InvalidArgumentException | ParseException | + SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); + } + } + } + } + } + public void update(AudioBroadcastCatch audioBroadcastCatch) { if (SipUtils.isFrontEnd(audioBroadcastCatch.getDeviceId())) { audioBroadcastCatch.setChannelId(audioBroadcastCatch.getDeviceId()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index e4538e10..58f92f8c 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -15,8 +15,6 @@ import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; -import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -85,7 +83,7 @@ public class MediaServerServiceImpl implements IMediaServerService { public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema())) { logger.info("流变化:注册 app->{}, stream->{}", event.getApp(), event.getStream()); - addCount(event.getSeverId()); + addCount(event.getMediaServer().getId()); } } @@ -97,7 +95,7 @@ public class MediaServerServiceImpl implements IMediaServerService { public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { logger.info("流变化:注销, app->{}, stream->{}", event.getApp(), event.getStream()); - removeCount(event.getSeverId()); + removeCount(event.getMediaServer().getId()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index ca526198..77ff9b01 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -4,13 +4,10 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; -import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; @@ -18,12 +15,14 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; @@ -34,9 +33,6 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo; -import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; -import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -233,209 +229,6 @@ public class ZLMHttpHookListener { applicationEventPublisher.publishEvent(mediaArrivalEvent); } return HookResult.SUCCESS(); - - - - JSONObject json = (JSONObject) JSON.toJSON(param); - taskExecutor.execute(() -> { - ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); - MediaServer mediaInfo = mediaServerService.getOne(param.getMediaServerId()); - if (mediaInfo == null) { - logger.info("[ZLM HOOK] 流变化未找到ZLM, {}", param.getMediaServerId()); - return; - } - if (subscribe != null) { - subscribe.response(mediaInfo, param); - } - - // TODO 重构此处逻辑 - if (param.isRegist()) { - // 处理流注册的鉴权信息, 流注销这里不再删除鉴权信息,下次来了新的鉴权信息会对就的进行覆盖 - if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); - if (streamAuthorityInfo == null) { - streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); - } else { - streamAuthorityInfo.setOriginType(param.getOriginType()); - streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr()); - } - redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); - } - } - if ("rtsp".equals(param.getSchema())) { - logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); - if (param.isRegist()) { - mediaServerService.addCount(param.getMediaServerId()); - } else { - mediaServerService.removeCount(param.getMediaServerId()); - } - - int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); - if (updateStatusResult > 0) { - - } - - if ("rtp".equals(param.getApp()) && !param.isRegist()) { - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); - if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { - inviteStreamService.removeInviteInfo(inviteInfo); - storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); - } - } else if ("broadcast".equals(param.getApp())) { - // 语音对讲推流 stream需要满足格式deviceId_channelId - if (param.getStream().indexOf("_") > 0) { - String[] streamArray = param.getStream().split("_"); - if (streamArray.length == 2) { - String deviceId = streamArray[0]; - String channelId = streamArray[1]; - Device device = deviceService.getDevice(deviceId); - if (device != null) { - if (param.isRegist()) { - if (audioBroadcastManager.exit(deviceId, channelId)) { - playService.stopAudioBroadcast(deviceId, channelId); - } - // 开启语音对讲通道 - try { - playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg) -> { - logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); - }); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 语音对讲: {}", e.getMessage()); - } - } else { - // 流注销 - playService.stopAudioBroadcast(deviceId, channelId); - } - } else { - logger.info("[语音对讲] 未找到设备:{}", deviceId); - } - } - } - } else if ("talk".equals(param.getApp())) { - // 语音对讲推流 stream需要满足格式deviceId_channelId - if (param.getStream().indexOf("_") > 0) { - String[] streamArray = param.getStream().split("_"); - if (streamArray.length == 2) { - String deviceId = streamArray[0]; - String channelId = streamArray[1]; - Device device = deviceService.getDevice(deviceId); - if (device != null) { - if (param.isRegist()) { - if (audioBroadcastManager.exit(deviceId, channelId)) { - playService.stopAudioBroadcast(deviceId, channelId); - } - // 开启语音对讲通道 - playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg) -> { - logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); - }); - } else { - // 流注销 - playService.stopTalk(device, channelId, param.isRegist()); - } - } else { - logger.info("[语音对讲] 未找到设备:{}", deviceId); - } - } - } - - } else { - if (!"rtp".equals(param.getApp())) { - String type = OriginType.values()[param.getOriginType()].getType(); - if (param.isRegist()) { - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo( - param.getApp(), param.getStream()); - String callId = null; - if (streamAuthorityInfo != null) { - callId = streamAuthorityInfo.getCallId(); - } - StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo, - param.getApp(), param.getStream(), MediaInfo.getInstance(param), callId); - param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); - redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param); - if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - param.setSeverId(userSetting.getServerId()); - zlmMediaListManager.addPush(param); - - // 冗余数据,自己系统中自用 - redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param); - } - } else { - // 兼容流注销时类型从redis记录获取 - OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( - param.getApp(), param.getStream(), param.getMediaServerId()); - if (onStreamChangedHookParam != null) { - type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); - redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream()); - if ("PUSH".equalsIgnoreCase(type)) { - // 冗余数据,自己系统中自用 - redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId()); - } - } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { -// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); - } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { - if (userSetting.isUsePushingAsStatus()) { - eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF); - } - } - if (type != null) { - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", param.getApp()); - jsonObject.put("stream", param.getStream()); - jsonObject.put("register", param.isRegist()); - jsonObject.put("mediaServerId", param.getMediaServerId()); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - } - } - } - if (!param.isRegist()) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); - if (!sendRtpItems.isEmpty()) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { - String platformId = sendRtpItem.getPlatformId(); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); - Device device = deviceService.getDevice(platformId); - - try { - if (platform != null) { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - } else { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); - if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) - || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - // 来自上级平台的停止对讲 - logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - } - } - } - } catch (SipException | InvalidArgumentException | ParseException | - SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); - } - } - } - } - } - } - }); - return HookResult.SUCCESS(); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java index 014c7616..360e29c0 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java @@ -45,9 +45,9 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Async("taskExecutor") @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { - if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { - - } +// if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { +// +// } } /** @@ -60,7 +60,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream()); if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { removeInviteInfo(inviteInfo); - stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); + storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 4de8420e..54e06c20 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,9 +1,9 @@ package com.genersoft.iot.vmp.service.impl; +import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; -import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; @@ -12,18 +12,20 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IInviteStreamService; -import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.*; +import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -31,6 +33,8 @@ import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.sdp.*; @@ -38,10 +42,6 @@ import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; import java.util.*; /** @@ -101,6 +101,34 @@ public class PlatformServiceImpl implements IPlatformService { private IInviteStreamService inviteStreamService; + /** + * 流离开的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); + if (!sendRtpItems.isEmpty()) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId); + + try { + if (platform != null) { + commanderForPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + } + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); + } + } + } + } + } + + @Override public ParentPlatform queryPlatformByServerGBId(String platformGbId) { return platformMapper.getParentPlatByServerGBId(platformGbId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 65cf6936..722139e9 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -23,11 +23,13 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; @@ -188,7 +190,6 @@ public class PlayServiceImpl implements IPlayService { }else if ("talk".equals(event.getApp())) { stopTalk(device, channelId, false); } - } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index dd0eae2b..02c5b30d 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -124,7 +124,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { - updateStatus(true, event.getApp(), event.getStream()); + updateStatus(false, event.getApp(), event.getStream()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index c3128e82..dd0e8854 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -126,6 +126,7 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushMapper.update(transform); gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId()); } + // TODO 相关的事件自行管理,不需要写入ZLMMediaListManager // ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); // if ( channelOnlineEventLister != null) { // try { @@ -137,6 +138,15 @@ public class StreamPushServiceImpl implements IStreamPushService { // } // 冗余数据,自己系统中自用 redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event); + + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", event.getApp()); + jsonObject.put("stream", event.getStream()); + jsonObject.put("register", true); + jsonObject.put("mediaServerId", event.getMediaServer().getId()); + redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject); } /** @@ -145,7 +155,36 @@ public class StreamPushServiceImpl implements IStreamPushService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { - + // 兼容流注销时类型从redis记录获取 + OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( + event.getApp(), event.getStream(), event.getMediaServer().getId()); + if (onStreamChangedHookParam != null) { + String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); + redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream()); + if ("PUSH".equalsIgnoreCase(type)) { + // 冗余数据,自己系统中自用 + redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId()); + } + if (type != null) { + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", event.getApp()); + jsonObject.put("stream", event.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", event.getMediaServer().getId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } + } + GbStream gbStream = storager.getGbStream(event.getApp(), event.getStream()); + if (gbStream != null) { + if (userSetting.isUsePushingAsStatus()) { + storager.mediaOffline(event.getApp(), event.getStream()); + eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + } + }else { + storager.removeMedia(event.getApp(), event.getStream()); + } }