diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index f386c33e..299d59ac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -6,8 +6,8 @@ import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent; import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; -import com.genersoft.iot.vmp.media.event.MediaServerOfflineEvent; -import com.genersoft.iot.vmp.media.event.MediaServerOnlineEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index f5385d4a..24eadbac 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -1,26 +1,13 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 14d496d5..1711938a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -14,12 +14,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamPush; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -279,11 +278,11 @@ public class SIPCommander implements ISIPCommander { } logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort()); - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribe, (MediaServer mediaServerItemInUse, HookParam hookParam) -> { + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId()); + subscribe.addSubscribe(rtpHook, (hookData) -> { if (event != null) { - event.response(mediaServerItemInUse, hookParam); - subscribe.removeSubscribe(hookSubscribe); + event.response(hookData); + subscribe.removeSubscribe(rtpHook); } }); String sdpIp; @@ -453,13 +452,13 @@ public class SIPCommander implements ISIPCommander { //ssrc content.append("y=" + ssrcInfo.getSsrc() + "\r\n"); - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); // 添加订阅 - subscribe.addSubscribe(hookSubscribe, (MediaServer mediaServerItemInUse, HookParam hookParam) -> { + subscribe.addSubscribe(rtpHook, (hookData) -> { if (hookEvent != null) { - hookEvent.response(mediaServerItemInUse, hookParam); + hookEvent.response(hookData); } - subscribe.removeSubscribe(hookSubscribe); + subscribe.removeSubscribe(rtpHook); }); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); @@ -554,19 +553,18 @@ public class SIPCommander implements ISIPCommander { content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc logger.debug("此时请求下载信令的ssrc===>{}",ssrcInfo.getSsrc()); - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); // 添加订阅 CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); String callId= newCallIdHeader.getCallId(); - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + subscribe.addSubscribe(rtpHook, (hookData) -> { logger.debug("sipc 添加订阅===callId {}",callId); - hookEvent.response(mediaServerItemInUse, hookParam); - subscribe.removeSubscribe(hookSubscribe); - hookSubscribe.getContent().put("regist", false); - hookSubscribe.getContent().put("schema", "rtsp"); + hookEvent.response(hookData); + subscribe.removeSubscribe(rtpHook); // 添加流注销的订阅,注销了后向设备发送bye - subscribe.addSubscribe(hookSubscribe, - (mediaServerItemForEnd, hookParam1) -> { + Hook departureHook = Hook.getInstance(HookType.on_media_departure, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); + subscribe.addSubscribe(departureHook, + (departureHookData) -> { logger.info("[录像]下载结束, 发送BYE"); try { streamByeCmd(device, channelId, ssrcInfo.getStream(), callId); @@ -604,20 +602,20 @@ public class SIPCommander implements ISIPCommander { } logger.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort()); - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, hookParam) -> { + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId()); + subscribe.addSubscribe(hook, (hookData) -> { if (event != null) { - event.response(mediaServerItemInUse, hookParam); - subscribe.removeSubscribe(hookSubscribeForStreamChange); + event.response(hookData); + subscribe.removeSubscribe(hook); } }); CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); callIdHeader.setCallId(callId); - HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream, null, mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribeForStreamPush, (mediaServerItemInUse, hookParam) -> { + Hook publishHook = Hook.getInstance(HookType.on_publish, "rtp", stream, mediaServerItem.getId()); + subscribe.addSubscribe(publishHook, (hookData) -> { if (eventForPush != null) { - eventForPush.response(mediaServerItemInUse, hookParam); + eventForPush.response(hookData); } }); // @@ -1260,7 +1258,6 @@ public class SIPCommander implements ISIPCommander { * @param startPriority 报警起始级别(可选) * @param endPriority 报警终止级别(可选) * @param alarmMethod 报警方式条件(可选) - * @param alarmType 报警类型 * @param startTime 报警发生起始时间(可选) * @param endTime 报警发生终止时间(可选) * @return true = 命令发送成功 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 441e0f0a..21aba6c4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -13,9 +13,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; @@ -905,11 +905,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribe, (MediaServer mediaServerItemInUse, HookParam hookParam) -> { + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId()); + subscribe.addSubscribe(hook, (hookData) -> { if (event != null) { - event.response(mediaServerItemInUse, hookParam); - subscribe.removeSubscribe(hookSubscribe); + event.response(hookData); + subscribe.removeSubscribe(hook); } }); String sdpIp = mediaServerItem.getSdpIp(); @@ -949,7 +949,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - subscribe.removeSubscribe(hookSubscribe); + subscribe.removeSubscribe(hook); errorEvent.response(e); }), e -> { ResponseEvent responseEvent = (ResponseEvent) e.event; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index a8c03d7b..9f4e4704 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -18,8 +18,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; @@ -115,7 +115,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private IMediaServerService mediaServerService; @Autowired - private HookSubscribe zlmHttpHookSubscribe; + private HookSubscribe hookSubscribe; @Autowired private SIPProcessorObserver sipProcessorObserver; @@ -723,17 +723,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // TODO 控制启用以使设备上线 logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); // 监听流上线 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; - logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, gbStream.getApp(), gbStream.getStream(), mediaServerItem.getId()); + this.hookSubscribe.addSubscribe(hook, (hookData) -> { + logger.info("[上级点播]拉流代理已经就绪, {}/{}", hookData.getApp(), hookData.getStream()); dynamicTask.stop(callIdHeader.getCallId()); pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); }); dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream()); - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + this.hookSubscribe.removeSubscribe(hook); }, userSetting.getPlatformPlayTimeout()); boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); if (!start) { @@ -742,7 +741,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); } - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + this.hookSubscribe.removeSubscribe(hook); dynamicTask.stop(callIdHeader.getCallId()); } } else if ("push".equals(gbStream.getStreamType())) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index 57b76829..72ba51af 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -127,8 +127,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp // 消息发送成功, 向上级发送invite,获取推流 try { - platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, hookParam)->{ - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (hookData)->{ // 上级平台推流成功 AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); if (broadcastCatch != null ) { @@ -136,20 +135,20 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp logger.info("[国标级联] 语音喊话 设备正在使用中 platform: {}, channel: {}", platform.getServerGBId(), deviceChannel.getChannelId()); // 查看语音通道已经建立且已经占用 回复BYE - platformService.stopBroadcast(platform, deviceChannel, streamChangedHookParam.getStream(), true, mediaServerItem); + platformService.stopBroadcast(platform, deviceChannel, hookData.getStream(), true, hookData.getMediaServer()); }else { // 查看语音通道已经建立但是未占用 - broadcastCatch.setApp(streamChangedHookParam.getApp()); - broadcastCatch.setStream(streamChangedHookParam.getStream()); - broadcastCatch.setMediaServerItem(mediaServerItem); + broadcastCatch.setApp(hookData.getApp()); + broadcastCatch.setStream(hookData.getStream()); + broadcastCatch.setMediaServerItem(hookData.getMediaServer()); audioBroadcastManager.update(broadcastCatch); // 推流到设备 - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, streamChangedHookParam.getStream(), null); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, hookData.getStream(), null); if (sendRtpItem == null) { - logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream()); - logger.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream()); + logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, hookData.getStream()); + logger.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, hookData.getStream()); try { - playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> { + playService.audioBroadcastCmd(device, targetId, hookData.getMediaServer(), hookData.getApp(), hookData.getStream(), 60, true, msg -> { logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId); }); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -157,7 +156,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp } }else { // 发流 - JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem); + JSONObject jsonObject = zlmServerFactory.startSendRtp(hookData.getMediaServer(), sendRtpItem); if (jsonObject != null && jsonObject.getInteger("code") == 0 ) { logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId); }else { @@ -167,7 +166,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp } }else { try { - playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> { + playService.audioBroadcastCmd(device, targetId, hookData.getMediaServer(), hookData.getApp(), hookData.getStream(), 60, true, msg -> { logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId); }); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index b11b9a8f..f8ff69cf 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -13,9 +13,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; +import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -106,9 +106,8 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage()); } // 去除监听流注销自动停止下载的监听 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcTransaction.getStream(), false, "rtsp", ssrcTransaction.getMediaServerId()); - subscribe.removeSubscribe(hookSubscribe); - + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId()); + subscribe.removeSubscribe(hook); // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null); if (sendRtpItem != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java index 942f63a9..4eb10412 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java @@ -1,7 +1,7 @@ package com.genersoft.iot.vmp.media; import com.genersoft.iot.vmp.conf.MediaConfig; -import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import org.slf4j.Logger; diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java index dcb46554..62015dc4 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java @@ -8,8 +8,8 @@ import com.genersoft.iot.vmp.media.abl.bean.AblServerConfig; import com.genersoft.iot.vmp.media.abl.bean.ConfigKeyId; import com.genersoft.iot.vmp.media.abl.event.HookAblServerKeepaliveEvent; import com.genersoft.iot.vmp.media.abl.event.HookAblServerStartEvent; -import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; -import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import org.slf4j.Logger; @@ -20,11 +20,8 @@ import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; -import java.io.File; import java.lang.reflect.Field; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java index 2cc9943e..e753fb6c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.bean; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import io.swagger.v3.oas.annotations.media.Schema; @@ -12,6 +13,15 @@ import java.util.List; */ @Schema(description = "视频信息") public class MediaInfo { + @Schema(description = "应用名") + private String app; + @Schema(description = "流ID") + private String stream; + @Schema(description = "流媒体节点") + private MediaServer mediaServer; + @Schema(description = "协议") + private String schema; + @Schema(description = "观看人数") private Integer readerCount; @Schema(description = "视频编码类型") @@ -37,8 +47,15 @@ public class MediaInfo { @Schema(description = "数据产生速度,单位byte/s") private Long bytesSpeed; - public static MediaInfo getInstance(JSONObject jsonObject) { + public static MediaInfo getInstance(JSONObject jsonObject, MediaServer mediaServer) { MediaInfo mediaInfo = new MediaInfo(); + mediaInfo.setMediaServer(mediaServer); + String app = jsonObject.getString("app"); + mediaInfo.setApp(app); + String stream = jsonObject.getString("stream"); + mediaInfo.setStream(stream); + String schema = jsonObject.getString("schema"); + mediaInfo.setSchema(schema); Integer totalReaderCount = jsonObject.getInteger("totalReaderCount"); Boolean online = jsonObject.getBoolean("online"); Integer originType = jsonObject.getInteger("originType"); @@ -110,9 +127,13 @@ public class MediaInfo { return mediaInfo; } - public static MediaInfo getInstance(OnStreamChangedHookParam param) { + public static MediaInfo getInstance(OnStreamChangedHookParam param, MediaServer mediaServer) { List tracks = param.getTracks(); MediaInfo mediaInfo = new MediaInfo(); + mediaInfo.setApp(param.getApp()); + mediaInfo.setStream(param.getStream()); + mediaInfo.setSchema(param.getSchema()); + mediaInfo.setMediaServer(mediaServer); mediaInfo.setReaderCount(param.getTotalReaderCount()); mediaInfo.setOnline(param.isRegist()); mediaInfo.setOriginType(param.getOriginType()); @@ -247,4 +268,36 @@ public class MediaInfo { public void setBytesSpeed(Long bytesSpeed) { this.bytesSpeed = bytesSpeed; } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public MediaServer getMediaServer() { + return mediaServer; + } + + public void setMediaServer(MediaServer mediaServer) { + this.mediaServer = mediaServer; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/bean/RecordInfo.java b/src/main/java/com/genersoft/iot/vmp/media/bean/RecordInfo.java new file mode 100644 index 00000000..aafc5db1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/bean/RecordInfo.java @@ -0,0 +1,92 @@ +package com.genersoft.iot.vmp.media.bean; + +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; + +public class RecordInfo { + private String fileName; + private String filePath; + private long fileSize; + private String folder; + private String url; + private long startTime; + private double timeLen; + + public static RecordInfo getInstance(OnRecordMp4HookParam hookParam) { + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setFileName(hookParam.getFile_name()); + recordInfo.setUrl(hookParam.getUrl()); + recordInfo.setFolder(hookParam.getFolder()); + recordInfo.setFilePath(hookParam.getFile_path()); + recordInfo.setFileSize(hookParam.getFile_size()); + recordInfo.setStartTime(hookParam.getStart_time()); + recordInfo.setTimeLen(hookParam.getTime_len()); + return recordInfo; + } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public long getFileSize() { + return fileSize; + } + + public void setFileSize(long fileSize) { + this.fileSize = fileSize; + } + + public String getFolder() { + return folder; + } + + public void setFolder(String folder) { + this.folder = folder; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public double getTimeLen() { + return timeLen; + } + + public void setTimeLen(double timeLen) { + this.timeLen = timeLen; + } + + @Override + public String toString() { + return "RecordInfo{" + + "文件名称='" + fileName + '\'' + + ", 文件路径='" + filePath + '\'' + + ", 文件大小=" + fileSize + + ", 开始时间=" + startTime + + ", 时长=" + timeLen + + '}'; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaNotFoundEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaNotFoundEvent.java deleted file mode 100644 index 2f8a8405..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaNotFoundEvent.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.genersoft.iot.vmp.media.event; - -import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamNotFoundHookParam; -import org.springframework.context.ApplicationEvent; - -/** - * 流未找到 - */ -public class MediaNotFoundEvent extends ApplicationEvent { - public MediaNotFoundEvent(Object source) { - super(source); - } - - private String app; - - private String stream; - - private MediaServer mediaServer; - - private String schema; - - public static MediaNotFoundEvent getInstance(Object source, OnStreamNotFoundHookParam hookParam, MediaServer mediaServer){ - MediaNotFoundEvent mediaDepartureEven = new MediaNotFoundEvent(source); - mediaDepartureEven.setApp(hookParam.getApp()); - mediaDepartureEven.setStream(hookParam.getStream()); - mediaDepartureEven.setSchema(hookParam.getSchema()); - mediaDepartureEven.setMediaServer(mediaServer); - return mediaDepartureEven; - } - - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public MediaServer getMediaServer() { - return mediaServer; - } - - public void setMediaServer(MediaServer mediaServer) { - this.mediaServer = mediaServer; - } - - public String getSchema() { - return schema; - } - - public void setSchema(String schema) { - this.schema = schema; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaRtpServerTimeoutEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaRtpServerTimeoutEvent.java deleted file mode 100644 index ca01142a..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaRtpServerTimeoutEvent.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.genersoft.iot.vmp.media.event; - -import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamNotFoundHookParam; -import org.springframework.context.ApplicationEvent; - -/** - * rtp 服务收流超时通知 - */ -public class MediaRtpServerTimeoutEvent extends ApplicationEvent { - public MediaRtpServerTimeoutEvent(Object source) { - super(source); - } - - private String app; - - private String stream; - - private MediaServer mediaServer; - - public static MediaRtpServerTimeoutEvent getInstance(Object source, OnStreamNotFoundHookParam hookParam, MediaServer mediaServer){ - MediaRtpServerTimeoutEvent mediaDepartureEven = new MediaRtpServerTimeoutEvent(source); - mediaDepartureEven.setApp(hookParam.getApp()); - mediaDepartureEven.setStream(hookParam.getStream()); - mediaDepartureEven.setMediaServer(mediaServer); - return mediaDepartureEven; - } - - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public MediaServer getMediaServer() { - return mediaServer; - } - - public void setMediaServer(MediaServer mediaServer) { - this.mediaServer = mediaServer; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java new file mode 100755 index 00000000..a0dc7c32 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java @@ -0,0 +1,86 @@ +package com.genersoft.iot.vmp.media.event.hook; + +/** + * zlm hook事件的参数 + * @author lin + */ +public class Hook { + + private HookType hookType; + + private String app; + + private String stream; + + private String mediaServerId; + + private Long createTime; + + public static Hook getInstance(HookType hookType, String app, String stream, String mediaServerId) { + Hook hookSubscribe = new Hook(); + hookSubscribe.setApp(app); + hookSubscribe.setStream(stream); + hookSubscribe.setHookType(hookType); + hookSubscribe.setMediaServerId(mediaServerId); + hookSubscribe.setCreateTime(System.currentTimeMillis()); + return hookSubscribe; + } + + public HookType getHookType() { + return hookType; + } + + public void setHookType(HookType hookType) { + this.hookType = hookType; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public Long getCreateTime() { + return createTime; + } + + public void setCreateTime(Long createTime) { + this.createTime = createTime; + } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Hook) { + Hook param = (Hook) obj; + return param.getHookType().equals(this.hookType) + && param.getApp().equals(this.app) + && param.getStream().equals(this.stream) + && param.getMediaServerId().equals(this.mediaServerId); + }else { + return false; + } + } + + @Override + public String toString() { + return this.getHookType() + this.getApp() + this.getStream() + this.getMediaServerId(); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookData.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookData.java new file mode 100644 index 00000000..25c02de5 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookData.java @@ -0,0 +1,132 @@ +package com.genersoft.iot.vmp.media.event.hook; + +import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.bean.RecordInfo; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaEvent; +import com.genersoft.iot.vmp.media.event.media.MediaPublishEvent; +import com.genersoft.iot.vmp.media.event.media.MediaRecordMp4Event; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import io.swagger.v3.oas.annotations.media.Schema; + +/** + * Hook返回的内容 + */ +public class HookData { + /** + * 应用名 + */ + private String app; + /** + * 流ID + */ + private String stream; + /** + * 流媒体节点 + */ + private MediaServer mediaServer; + /** + * 协议 + */ + private String schema; + + /** + * 流信息 + */ + private MediaInfo mediaInfo; + + /** + * 录像信息 + */ + private RecordInfo recordInfo; + + @Schema(description = "推流的额外参数") + private String params; + public static HookData getInstance(MediaEvent mediaEvent) { + HookData hookData = new HookData(); + if (mediaEvent instanceof MediaPublishEvent) { + MediaPublishEvent event = (MediaPublishEvent) mediaEvent; + hookData.setApp(event.getApp()); + hookData.setStream(event.getStream()); + hookData.setSchema(event.getSchema()); + hookData.setMediaServer(event.getMediaServer()); + hookData.setParams(event.getParams()); + }else if (mediaEvent instanceof MediaArrivalEvent) { + MediaArrivalEvent event = (MediaArrivalEvent) mediaEvent; + hookData.setApp(event.getApp()); + hookData.setStream(event.getStream()); + hookData.setSchema(event.getSchema()); + hookData.setMediaServer(event.getMediaServer()); + hookData.setMediaInfo(event.getMediaInfo()); + }else if (mediaEvent instanceof MediaRecordMp4Event) { + MediaRecordMp4Event event = (MediaRecordMp4Event) mediaEvent; + hookData.setApp(event.getApp()); + hookData.setStream(event.getStream()); + hookData.setSchema(event.getSchema()); + hookData.setMediaServer(event.getMediaServer()); + hookData.setRecordInfo(event.getRecordInfo()); + }else { + hookData.setApp(mediaEvent.getApp()); + hookData.setStream(mediaEvent.getStream()); + hookData.setSchema(mediaEvent.getSchema()); + hookData.setMediaServer(mediaEvent.getMediaServer()); + } + return hookData; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public MediaServer getMediaServer() { + return mediaServer; + } + + public void setMediaServer(MediaServer mediaServer) { + this.mediaServer = mediaServer; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public MediaInfo getMediaInfo() { + return mediaInfo; + } + + public void setMediaInfo(MediaInfo mediaInfo) { + this.mediaInfo = mediaInfo; + } + + public String getParams() { + return params; + } + + public void setParams(String params) { + this.params = params; + } + + public RecordInfo getRecordInfo() { + return recordInfo; + } + + public void setRecordInfo(RecordInfo recordInfo) { + this.recordInfo = recordInfo; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java index f5dac01c..64b8ffeb 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java @@ -1,172 +1,105 @@ package com.genersoft.iot.vmp.media.event.hook; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; -import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaEvent; +import com.genersoft.iot.vmp.media.event.media.MediaPublishEvent; +import org.mybatis.logging.Logger; +import org.mybatis.logging.LoggerFactory; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; import java.time.Instant; -import java.util.*; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; /** - * ZLMediaServer的hook事件订阅 + * zlm hook事件的参数 * @author lin */ @Component public class HookSubscribe { - private final static Logger logger = LoggerFactory.getLogger(HookSubscribe.class); + /** + * 订阅数据过期时间 + */ + private final long subscribeExpire = 5 * 1000; @FunctionalInterface public interface Event{ - void response(MediaServer mediaServerItem, HookParam hookParam); + void response(HookData data); } /** * 流到来的处理 */ @Async("taskExecutor") - @org.springframework.context.event.EventListener + @EventListener public void onApplicationEvent(MediaArrivalEvent event) { - for (HookType hookType : allSubscribes.keySet()) { - if (hookType.equals(HookType.on_stream_changed)) { - - } - } - } - - private Map> allSubscribes = new ConcurrentHashMap<>(); - - public void addSubscribe(IHookSubscribe hookSubscribe, HookSubscribe.Event event) { - if (hookSubscribe.getExpires() == null) { - // 默认5分钟过期 - Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5)); - hookSubscribe.setExpires(expiresInstant); - } - allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event); - } - - public HookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) { - HookSubscribe.Event event= null; - Map eventMap = allSubscribes.get(type); - if (eventMap == null) { - return null; - } - for (IHookSubscribe key : eventMap.keySet()) { - Boolean result = null; - - for (String s : key.getContent().keySet()) { - if (result == null) { - result = key.getContent().getString(s).equals(hookResponse.getString(s)); - }else { - if (key.getContent().getString(s) == null) { - continue; - } - result = result && key.getContent().getString(s).equals(hookResponse.getString(s)); - } - } - if (null != result && result) { - event = eventMap.get(key); - } - } - return event; - } - - public void removeSubscribe(IHookSubscribe hookSubscribe) { - Map eventMap = allSubscribes.get(hookSubscribe.getHookType()); - if (eventMap == null) { - return; + if ("rtsp".equals(event.getSchema())) { + System.out.println("流到来的处理: " + allSubscribes.size()); + sendNotify(HookType.on_media_arrival, event); } - Set> entries = eventMap.entrySet(); - if (entries.size() > 0) { - List> entriesToRemove = new ArrayList<>(); - for (Map.Entry entry : entries) { - JSONObject content = entry.getKey().getContent(); - if (content == null || content.size() == 0) { - entriesToRemove.add(entry); - continue; - } - Boolean result = null; - for (String s : content.keySet()) { - if (result == null) { - result = content.getString(s).equals(hookSubscribe.getContent().getString(s)); - }else { - if (content.getString(s) == null) { - continue; - } - result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s)); - } - } - if (result){ - entriesToRemove.add(entry); - } - } - - if (!CollectionUtils.isEmpty(entriesToRemove)) { - for (Map.Entry entry : entriesToRemove) { - eventMap.remove(entry.getKey()); - } - if (eventMap.size() == 0) { - allSubscribes.remove(hookSubscribe.getHookType()); - } - } - - } } /** - * 获取某个类型的所有的订阅 - * @param type - * @return + * 流结束事件 */ - public List getSubscribes(HookType type) { - Map eventMap = allSubscribes.get(type); - if (eventMap == null) { - return null; + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + if ("rtsp".equals(event.getSchema())) { + sendNotify(HookType.on_media_departure, event); } - List result = new ArrayList<>(); - for (IHookSubscribe key : eventMap.keySet()) { - result.add(eventMap.get(key)); - } - return result; + + } + /** + * 推流鉴权事件 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaPublishEvent event) { + sendNotify(HookType.on_publish, event); } - public List getAll(){ - ArrayList result = new ArrayList<>(); - Collection> values = allSubscribes.values(); - for (Map value : values) { - result.addAll(value.keySet()); + private final Map allSubscribes = new ConcurrentHashMap<>(); + private final Map allHook = new ConcurrentHashMap<>(); + + private void sendNotify(HookType hookType, MediaEvent event) { + Hook paramHook = Hook.getInstance(hookType, event.getApp(), event.getStream(), event.getMediaServer().getId()); + Event hookSubscribeEvent = allSubscribes.get(paramHook.toString()); + if (hookSubscribeEvent != null) { + HookData data = HookData.getInstance(event); + hookSubscribeEvent.response(data); } - return result; + } + + public void addSubscribe(Hook hook, HookSubscribe.Event event) { + if (hook.getCreateTime() == null) { + hook.setCreateTime(System.currentTimeMillis()); + } + allSubscribes.put(hook.toString(), event); + allHook.put(hook.toString(), hook); + } + + public void removeSubscribe(Hook hook) { + allSubscribes.remove(hook.toString()); + allHook.remove(hook.toString()); } /** * 对订阅数据进行过期清理 */ -// @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 - @Scheduled(fixedRate = 2 * 1000) + @Scheduled(fixedRate=subscribeExpire) //每5分钟执行一次 public void execute(){ - Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5)); - int total = 0; - for (HookType hookType : allSubscribes.keySet()) { - Map hookSubscribeEventMap = allSubscribes.get(hookType); - if (hookSubscribeEventMap.size() > 0) { - for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) { - if (hookSubscribe.getExpires().isBefore(instant)) { - // 过期的 - hookSubscribeEventMap.remove(hookSubscribe); - total ++; - } - } + long expireTime = System.currentTimeMillis() - subscribeExpire; + for (Hook hook : allHook.values()) { + if (hook.getCreateTime() < expireTime) { + allSubscribes.remove(hook.toString()); + allHook.remove(hook.toString()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeFactory.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeFactory.java deleted file mode 100755 index 9182205e..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.genersoft.iot.vmp.media.event.hook; - - -import com.alibaba.fastjson2.JSONObject; - -/** - * hook 订阅工厂 - * @author lin - */ -public class HookSubscribeFactory { - - public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) { - HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange(); - JSONObject subscribeKey = new com.alibaba.fastjson2.JSONObject(); - subscribeKey.put("app", app); - subscribeKey.put("stream", stream); - subscribeKey.put("regist", regist); - if (scheam != null) { - subscribeKey.put("schema", scheam); - } - subscribeKey.put("mediaServerId", mediaServerId); - hookSubscribe.setContent(subscribeKey); - - return hookSubscribe; - } - - public static HookSubscribeForRtpServerTimeout on_rtp_server_timeout(String stream, String ssrc, String mediaServerId) { - HookSubscribeForRtpServerTimeout hookSubscribe = new HookSubscribeForRtpServerTimeout(); - JSONObject subscribeKey = new com.alibaba.fastjson2.JSONObject(); - subscribeKey.put("stream_id", stream); - subscribeKey.put("ssrc", ssrc); - subscribeKey.put("mediaServerId", mediaServerId); - hookSubscribe.setContent(subscribeKey); - return hookSubscribe; - } - - public static HookSubscribeForStreamPush on_publish(String app, String stream, String scheam, String mediaServerId) { - HookSubscribeForStreamPush hookSubscribe = new HookSubscribeForStreamPush(); - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", app); - subscribeKey.put("stream", stream); - if (scheam != null) { - subscribeKey.put("schema", scheam); - } - subscribeKey.put("mediaServerId", mediaServerId); - hookSubscribe.setContent(subscribeKey); - - return hookSubscribe; - } - - public static HookSubscribeForRecordMp4 on_record_mp4(String mediaServerId, String app, String stream) { - HookSubscribeForRecordMp4 hookSubscribe = new HookSubscribeForRecordMp4(); - JSONObject subscribeKey = new com.alibaba.fastjson2.JSONObject(); - subscribeKey.put("app", app); - subscribeKey.put("stream", stream); - subscribeKey.put("mediaServerId", mediaServerId); - hookSubscribe.setContent(subscribeKey); - - return hookSubscribe; - } - -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForRecordMp4.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForRecordMp4.java deleted file mode 100755 index 46f62a89..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForRecordMp4.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.genersoft.iot.vmp.media.event.hook; - -import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.annotation.JSONField; - -import java.time.Instant; - -/** - * hook订阅-录像完成 - * @author lin - */ -public class HookSubscribeForRecordMp4 implements IHookSubscribe{ - - private HookType hookType = HookType.on_record_mp4; - - private JSONObject content; - - @JSONField(format="yyyy-MM-dd HH:mm:ss") - private Instant expires; - - @Override - public HookType getHookType() { - return hookType; - } - - @Override - public JSONObject getContent() { - return content; - } - - public void setContent(JSONObject content) { - this.content = content; - } - - @Override - public Instant getExpires() { - return expires; - } - - @Override - public void setExpires(Instant expires) { - this.expires = expires; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForRtpServerTimeout.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForRtpServerTimeout.java deleted file mode 100755 index d526c785..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForRtpServerTimeout.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.genersoft.iot.vmp.media.event.hook; - -import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.annotation.JSONField; - -import java.time.Instant; - -/** - * hook订阅-收流超时 - * @author lin - */ -public class HookSubscribeForRtpServerTimeout implements IHookSubscribe{ - - private HookType hookType = HookType.on_rtp_server_timeout; - - private JSONObject content; - - @JSONField(format="yyyy-MM-dd HH:mm:ss") - private Instant expires; - - @Override - public HookType getHookType() { - return hookType; - } - - @Override - public JSONObject getContent() { - return content; - } - - public void setContent(JSONObject content) { - this.content = content; - } - - @Override - public Instant getExpires() { - return expires; - } - - @Override - public void setExpires(Instant expires) { - this.expires = expires; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForStreamChange.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForStreamChange.java deleted file mode 100755 index b8d26755..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForStreamChange.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.genersoft.iot.vmp.media.event.hook; - -import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.annotation.JSONField; - -import java.time.Instant; - -/** - * hook订阅-流变化 - * @author lin - */ -public class HookSubscribeForStreamChange implements IHookSubscribe{ - - private HookType hookType = HookType.on_stream_changed; - - private JSONObject content; - - @JSONField(format="yyyy-MM-dd HH:mm:ss") - private Instant expires; - - @Override - public HookType getHookType() { - return hookType; - } - - @Override - public JSONObject getContent() { - return content; - } - - public void setContent(JSONObject content) { - this.content = content; - } - - @Override - public Instant getExpires() { - return expires; - } - - @Override - public void setExpires(Instant expires) { - this.expires = expires; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForStreamPush.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForStreamPush.java deleted file mode 100644 index e6191814..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribeForStreamPush.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.genersoft.iot.vmp.media.event.hook; - - -import com.alibaba.fastjson2.JSONObject; - -import java.time.Instant; - -/** - * hook订阅-开始推流 - * @author lin - */ -public class HookSubscribeForStreamPush implements IHookSubscribe{ - - private HookType hookType = HookType.on_publish; - - private JSONObject content; - - private Instant expires; - - @Override - public HookType getHookType() { - return hookType; - } - - @Override - public JSONObject getContent() { - return content; - } - - public void setContent(JSONObject content) { - this.content = content; - } - - @Override - public Instant getExpires() { - return expires; - } - - @Override - public void setExpires(Instant expires) { - this.expires = expires; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookType.java index 77e37a82..58bd6568 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookType.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookType.java @@ -10,6 +10,6 @@ public enum HookType { on_publish, on_record_mp4, on_media_arrival, - on_stream_changed, + on_media_departure, on_rtp_server_timeout, } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/IHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/IHookSubscribe.java deleted file mode 100755 index 7fb48ce7..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/IHookSubscribe.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.genersoft.iot.vmp.media.event.hook; - -import com.alibaba.fastjson2.JSONObject; - -import java.time.Instant; - -/** - * zlm hook事件的参数 - * @author lin - */ -public interface IHookSubscribe { - - /** - * 获取hook类型 - * @return hook类型 - */ - HookType getHookType(); - - /** - * 获取hook的具体内容 - * @return hook的具体内容 - */ - JSONObject getContent(); - - /** - * 设置过期时间 - * @param instant 过期时间 - */ - void setExpires(Instant instant); - - /** - * 获取过期时间 - * @return 过期时间 - */ - Instant getExpires(); -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java similarity index 58% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java index b70fcf79..af1e23e5 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java @@ -1,21 +1,20 @@ -package com.genersoft.iot.vmp.media.event; +package com.genersoft.iot.vmp.media.event.media; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import org.springframework.context.ApplicationEvent; /** * 流到来事件 */ -public class MediaArrivalEvent extends ApplicationEvent { +public class MediaArrivalEvent extends MediaEvent { public MediaArrivalEvent(Object source) { super(source); } public static MediaArrivalEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){ MediaArrivalEvent mediaArrivalEvent = new MediaArrivalEvent(source); - mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam)); + mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam, mediaServer)); mediaArrivalEvent.setApp(hookParam.getApp()); mediaArrivalEvent.setStream(hookParam.getStream()); mediaArrivalEvent.setMediaServer(mediaServer); @@ -26,14 +25,6 @@ public class MediaArrivalEvent extends ApplicationEvent { private MediaInfo mediaInfo; - private String app; - - private String stream; - - private MediaServer mediaServer; - - private String schema; - private String callId; public MediaInfo getMediaInfo() { @@ -44,37 +35,6 @@ public class MediaArrivalEvent extends ApplicationEvent { this.mediaInfo = mediaInfo; } - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public MediaServer getMediaServer() { - return mediaServer; - } - - public void setMediaServer(MediaServer mediaServer) { - this.mediaServer = mediaServer; - } - - public String getSchema() { - return schema; - } - - public void setSchema(String schema) { - this.schema = schema; - } public String getCallId() { return callId; diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaDepartureEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaDepartureEvent.java new file mode 100644 index 00000000..5cf87de2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaDepartureEvent.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.media.event.media; + +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; + +/** + * 流离开事件 + */ +public class MediaDepartureEvent extends MediaEvent { + public MediaDepartureEvent(Object source) { + super(source); + } + + public static MediaDepartureEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){ + MediaDepartureEvent mediaDepartureEven = new MediaDepartureEvent(source); + mediaDepartureEven.setApp(hookParam.getApp()); + mediaDepartureEven.setStream(hookParam.getStream()); + mediaDepartureEven.setSchema(hookParam.getSchema()); + mediaDepartureEven.setMediaServer(mediaServer); + return mediaDepartureEven; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaEvent.java similarity index 58% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/media/MediaEvent.java index f1113852..54d0a420 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaEvent.java @@ -1,16 +1,16 @@ -package com.genersoft.iot.vmp.media.event; +package com.genersoft.iot.vmp.media.event.media; import com.genersoft.iot.vmp.media.bean.MediaInfo; -import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookListener; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import org.springframework.context.ApplicationEvent; /** - * 流离开事件 + * 流到来事件 */ -public class MediaDepartureEvent extends ApplicationEvent { - public MediaDepartureEvent(Object source) { +public class MediaEvent extends ApplicationEvent { + + public MediaEvent(Object source) { super(source); } @@ -22,14 +22,6 @@ public class MediaDepartureEvent extends ApplicationEvent { private String schema; - public static MediaDepartureEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){ - MediaDepartureEvent mediaDepartureEven = new MediaDepartureEvent(source); - mediaDepartureEven.setApp(hookParam.getApp()); - mediaDepartureEven.setStream(hookParam.getStream()); - mediaDepartureEven.setSchema(hookParam.getSchema()); - mediaDepartureEven.setMediaServer(mediaServer); - return mediaDepartureEven; - } public String getApp() { return app; @@ -62,4 +54,5 @@ public class MediaDepartureEvent extends ApplicationEvent { public void setSchema(String schema) { this.schema = schema; } + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaNotFoundEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaNotFoundEvent.java new file mode 100644 index 00000000..b45f55e8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaNotFoundEvent.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.media.event.media; + +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamNotFoundHookParam; + +/** + * 流未找到 + */ +public class MediaNotFoundEvent extends MediaEvent { + public MediaNotFoundEvent(Object source) { + super(source); + } + + public static MediaNotFoundEvent getInstance(Object source, OnStreamNotFoundHookParam hookParam, MediaServer mediaServer){ + MediaNotFoundEvent mediaDepartureEven = new MediaNotFoundEvent(source); + mediaDepartureEven.setApp(hookParam.getApp()); + mediaDepartureEven.setStream(hookParam.getStream()); + mediaDepartureEven.setSchema(hookParam.getSchema()); + mediaDepartureEven.setMediaServer(mediaServer); + return mediaDepartureEven; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaPublishEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaPublishEvent.java new file mode 100644 index 00000000..b50f8965 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaPublishEvent.java @@ -0,0 +1,33 @@ +package com.genersoft.iot.vmp.media.event.media; + +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnPublishHookParam; + +/** + * 推流鉴权事件 + */ +public class MediaPublishEvent extends MediaEvent { + public MediaPublishEvent(Object source) { + super(source); + } + + public static MediaPublishEvent getInstance(Object source, OnPublishHookParam hookParam, MediaServer mediaServer){ + MediaPublishEvent mediaPublishEvent = new MediaPublishEvent(source); + mediaPublishEvent.setApp(hookParam.getApp()); + mediaPublishEvent.setStream(hookParam.getStream()); + mediaPublishEvent.setMediaServer(mediaServer); + mediaPublishEvent.setSchema(hookParam.getSchema()); + mediaPublishEvent.setParams(hookParam.getParams()); + return mediaPublishEvent; + } + + private String params; + + public String getParams() { + return params; + } + + public void setParams(String params) { + this.params = params; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaRecordMp4Event.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaRecordMp4Event.java new file mode 100644 index 00000000..e4750c1c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaRecordMp4Event.java @@ -0,0 +1,36 @@ +package com.genersoft.iot.vmp.media.event.media; + +import com.genersoft.iot.vmp.media.bean.RecordInfo; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; + +/** + * 录像文件生成事件 + */ +public class MediaRecordMp4Event extends MediaEvent { + public MediaRecordMp4Event(Object source) { + super(source); + } + + private RecordInfo recordInfo; + + public static MediaRecordMp4Event getInstance(Object source, OnRecordMp4HookParam hookParam, MediaServer mediaServer){ + MediaRecordMp4Event mediaRecordMp4Event = new MediaRecordMp4Event(source); + mediaRecordMp4Event.setApp(hookParam.getApp()); + mediaRecordMp4Event.setStream(hookParam.getStream()); + RecordInfo recordInfo = RecordInfo.getInstance(hookParam); + mediaRecordMp4Event.setRecordInfo(recordInfo); + mediaRecordMp4Event.setMediaServer(mediaServer); + return mediaRecordMp4Event; + } + + public RecordInfo getRecordInfo() { + return recordInfo; + } + + public void setRecordInfo(RecordInfo recordInfo) { + this.recordInfo = recordInfo; + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaRtpServerTimeoutEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaRtpServerTimeoutEvent.java new file mode 100644 index 00000000..939c8526 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaRtpServerTimeoutEvent.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.media.event.media; + +import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; + +/** + * RtpServer收流超时事件 + */ +public class MediaRtpServerTimeoutEvent extends MediaEvent { + public MediaRtpServerTimeoutEvent(Object source) { + super(source); + } + + public static MediaRtpServerTimeoutEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){ + MediaRtpServerTimeoutEvent mediaDepartureEven = new MediaRtpServerTimeoutEvent(source); + mediaDepartureEven.setApp(hookParam.getApp()); + mediaDepartureEven.setStream(hookParam.getStream()); + mediaDepartureEven.setSchema(hookParam.getSchema()); + mediaDepartureEven.setMediaServer(mediaServer); + return mediaDepartureEven; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaSendRtpStoppedEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaSendRtpStoppedEvent.java similarity index 96% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaSendRtpStoppedEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaSendRtpStoppedEvent.java index 93192809..c9679f75 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaSendRtpStoppedEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaSendRtpStoppedEvent.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.event; +package com.genersoft.iot.vmp.media.event.mediaServer; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamNotFoundHookParam; diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerChangeEvent.java similarity index 94% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerChangeEvent.java index 0da054fe..e3650454 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerChangeEvent.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.event; +package com.genersoft.iot.vmp.media.event.mediaServer; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import org.springframework.context.ApplicationEvent; diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerDeleteEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerDeleteEvent.java similarity index 75% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaServerDeleteEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerDeleteEvent.java index 10a368b4..a716ff0e 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerDeleteEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerDeleteEvent.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.event; +package com.genersoft.iot.vmp.media.event.mediaServer; /** * zlm在线事件 diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerEventAbstract.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerEventAbstract.java similarity index 89% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaServerEventAbstract.java rename to src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerEventAbstract.java index 91806f8d..a9bf7698 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerEventAbstract.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerEventAbstract.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.event; +package com.genersoft.iot.vmp.media.event.mediaServer; import org.springframework.context.ApplicationEvent; diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOfflineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerOfflineEvent.java similarity index 61% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOfflineEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerOfflineEvent.java index ce3c5a9d..2f9ac235 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOfflineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerOfflineEvent.java @@ -1,6 +1,4 @@ -package com.genersoft.iot.vmp.media.event; - -import com.genersoft.iot.vmp.media.event.MediaServerEventAbstract; +package com.genersoft.iot.vmp.media.event.mediaServer; /** * zlm离线事件类 diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerOnlineEvent.java similarity index 60% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOnlineEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerOnlineEvent.java index 5d9bdc43..673dce43 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOnlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerOnlineEvent.java @@ -1,6 +1,4 @@ -package com.genersoft.iot.vmp.media.event; - -import com.genersoft.iot.vmp.media.event.MediaServerEventAbstract; +package com.genersoft.iot.vmp.media.event.mediaServer; /** * zlm在线事件 diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java similarity index 97% rename from src/main/java/com/genersoft/iot/vmp/media/event/MediaServerStatusEventListener.java rename to src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java index 0d8e38c0..2413f567 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.event; +package com.genersoft.iot.vmp.media.event.mediaServer; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index de095eac..4ec0bfed 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -104,4 +104,36 @@ public interface IMediaServerService { Map getFFmpegCMDs(MediaServer mediaServer); + /** + * 根据应用名和流ID获取播放地址, 通过zlm接口检查是否存在 + * @param app + * @param stream + * @return + */ + StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId,String addr, boolean authority); + + + /** + * 根据应用名和流ID获取播放地址, 通过zlm接口检查是否存在, 返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况 + * @param app + * @param stream + * @return + */ + StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, boolean authority); + + /** + * 根据应用名和流ID获取播放地址, 只是地址拼接 + * @param app + * @param stream + * @return + */ + StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String callId); + + /** + * 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况 + * @param app + * @param stream + * @return + */ + StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String addr, String callId, boolean isPlay); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 58f92f8c..30a9b831 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -4,17 +4,19 @@ import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; -import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; -import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -74,6 +76,10 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private ApplicationEventPublisher applicationEventPublisher; + @Autowired + private MediaConfig mediaConfig; + + /** * 流到来的处理 @@ -714,4 +720,63 @@ public class MediaServerServiceImpl implements IMediaServerService { } return mediaNodeServerService.getFFmpegCMDs(mediaServer); } + + @Override + public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String callId) { + return getStreamInfoByAppAndStream(mediaServerItem, app, stream, mediaInfo, null, callId, true); + } + + @Override + public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr, boolean authority) { + StreamInfo streamInfo = null; + if (mediaServerId == null) { + mediaServerId = mediaConfig.getId(); + } + MediaServer mediaInfo = getOne(mediaServerId); + if (mediaInfo == null) { + return null; + } + String calld = null; + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); + if (streamAuthorityInfo != null) { + calld = streamAuthorityInfo.getCallId(); + } + List streamInfoList = getMediaList(mediaInfo, app, stream, calld); + if (streamInfoList.isEmpty()) { + return null; + }else { + return streamInfoList.get(0); + } + } + + + + @Override + public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, boolean authority) { + return getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, null, authority); + } + + @Override + public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServer, String app, String stream, MediaInfo mediaInfo, String addr, String callId, boolean isPlay) { + StreamInfo streamInfoResult = new StreamInfo(); + streamInfoResult.setStream(stream); + streamInfoResult.setApp(app); + if (addr == null) { + addr = mediaServer.getStreamIp(); + } + + streamInfoResult.setIp(addr); + streamInfoResult.setMediaServerId(mediaServer.getId()); + String callIdParam = ObjectUtils.isEmpty(callId)?"":"?callId=" + callId; + streamInfoResult.setRtmp(addr, mediaServer.getRtmpPort(),mediaServer.getRtmpSSlPort(), app, stream, callIdParam); + streamInfoResult.setRtsp(addr, mediaServer.getRtspPort(),mediaServer.getRtspSSLPort(), app, stream, callIdParam); + streamInfoResult.setFlv(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); + streamInfoResult.setFmp4(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); + streamInfoResult.setHls(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); + streamInfoResult.setTs(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); + streamInfoResult.setRtc(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam, isPlay); + + streamInfoResult.setMediaInfo(mediaInfo); + return streamInfoResult; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index fc128590..2717fc3d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -11,9 +11,10 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; -import com.genersoft.iot.vmp.media.event.*; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.event.media.*; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; @@ -175,13 +176,6 @@ public class ZLMHttpHookListener { return new HookResultForOnPublish(200, "success"); } - taskExecutor.execute(() -> { - HookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); - if (subscribe != null) { - subscribe.response(mediaServer, param); - } - }); - ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams()); if (resultForOnPublish != null) { HookResultForOnPublish successResult = HookResultForOnPublish.getInstance(resultForOnPublish); @@ -316,19 +310,12 @@ public class ZLMHttpHookListener { MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); if (mediaServerItem != null) { event.setMediaServer(mediaServerItem); + event.setApp("rtp"); applicationEventPublisher.publishEvent(event); } }catch (Exception e) { logger.info("[ZLM-HOOK-rtpServer收流超时] 发送通知失败 ", e); } - taskExecutor.execute(() -> { - List subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout); - if (subscribes != null && !subscribes.isEmpty()) { - for (HookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, param); - } - } - }); return HookResult.SUCCESS(); } @@ -341,16 +328,16 @@ public class ZLMHttpHookListener { public HookResult onRecordMp4(HttpServletRequest request, @RequestBody OnRecordMp4HookParam param) { logger.info("[ZLM HOOK] 录像完成事件:{}->{}", param.getMediaServerId(), param.getFile_path()); - taskExecutor.execute(() -> { - List subscribes = this.subscribe.getSubscribes(HookType.on_record_mp4); - if (subscribes != null && !subscribes.isEmpty()) { - for (HookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, param); - } + try { + MediaRecordMp4Event event = new MediaRecordMp4Event(this); + MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); + if (mediaServerItem != null) { + event.setMediaServer(mediaServerItem); + applicationEventPublisher.publishEvent(event); } - cloudRecordService.addRecord(param); - - }); + }catch (Exception e) { + logger.info("[ZLM-HOOK-rtpServer收流超时] 发送通知失败 ", e); + } return HookResult.SUCCESS(); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 60327ebf..11e6063b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -158,7 +158,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { return null; } JSONObject mediaJSON = data.getJSONObject(0); - MediaInfo mediaInfo = MediaInfo.getInstance(mediaJSON); + MediaInfo mediaInfo = MediaInfo.getInstance(mediaJSON, mediaServer); StreamInfo streamInfo = getStreamInfoByAppAndStream(mediaServer, app, stream, mediaInfo, callId, true); if (streamInfo != null) { streamInfoList.add(streamInfo); @@ -207,7 +207,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { if (jsonObject.getInteger("code") != 0) { return null; } - return MediaInfo.getInstance(jsonObject); + return MediaInfo.getInstance(jsonObject, mediaServer); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java index 6d633298..27e62b2b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java @@ -5,8 +5,8 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; -import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java index 66d07de2..8b722a98 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java @@ -1,6 +1,6 @@ package com.genersoft.iot.vmp.media.zlm.dto; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; /** * 流的鉴权信息 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java index eda660eb..797286fe 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java @@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.media.zlm.dto; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.utils.DateUtil; import io.swagger.v3.oas.annotations.media.Schema; diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java b/src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java index bffd25b5..86ee7b8d 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java @@ -20,11 +20,6 @@ public interface ICloudRecordService { */ PageInfo getList(int page, int count, String query, String app, String stream, String startTime, String endTime, List mediaServerItems); - /** - * 根据hook消息增加一条记录 - */ - void addRecord(OnRecordMp4HookParam param); - /** * 获取所有的日期 */ diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java index d3a57f7a..649c7276 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java @@ -10,39 +10,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; */ public interface IMediaService { - /** - * 根据应用名和流ID获取播放地址, 通过zlm接口检查是否存在 - * @param app - * @param stream - * @return - */ - StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId,String addr, boolean authority); - - - /** - * 根据应用名和流ID获取播放地址, 通过zlm接口检查是否存在, 返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况 - * @param app - * @param stream - * @return - */ - StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, boolean authority); - - /** - * 根据应用名和流ID获取播放地址, 只是地址拼接 - * @param app - * @param stream - * @return - */ - StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String callId); - - /** - * 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况 - * @param app - * @param stream - * @return - */ - StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String addr, String callId, boolean isPlay); - /** * 播放鉴权 */ diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 88cac66c..9ba4964a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.bean.ErrorCallback; @@ -30,7 +31,7 @@ public interface IPlayService { ErrorCallback callback); SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); - StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, HookParam hookParam, String deviceId, String channelId); + StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId); MediaServer getNewMediaServerItem(Device device); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/CloudRecordItem.java b/src/main/java/com/genersoft/iot/vmp/service/bean/CloudRecordItem.java index 771e4c81..c9a54bc9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/CloudRecordItem.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/CloudRecordItem.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service.bean; +import com.genersoft.iot.vmp.media.event.media.MediaRecordMp4Event; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; /** @@ -76,18 +77,18 @@ public class CloudRecordItem { */ private long timeLen; - public static CloudRecordItem getInstance(OnRecordMp4HookParam param) { + public static CloudRecordItem getInstance(MediaRecordMp4Event param) { CloudRecordItem cloudRecordItem = new CloudRecordItem(); cloudRecordItem.setApp(param.getApp()); cloudRecordItem.setStream(param.getStream()); - cloudRecordItem.setStartTime(param.getStart_time()*1000); - cloudRecordItem.setFileName(param.getFile_name()); - cloudRecordItem.setFolder(param.getFolder()); - cloudRecordItem.setFileSize(param.getFile_size()); - cloudRecordItem.setFilePath(param.getFile_path()); - cloudRecordItem.setMediaServerId(param.getMediaServerId()); - cloudRecordItem.setTimeLen((long) param.getTime_len() * 1000); - cloudRecordItem.setEndTime((param.getStart_time() + (long)param.getTime_len()) * 1000); + cloudRecordItem.setStartTime(param.getRecordInfo().getStartTime()*1000); + cloudRecordItem.setFileName(param.getRecordInfo().getFileName()); + cloudRecordItem.setFolder(param.getRecordInfo().getFolder()); + cloudRecordItem.setFileSize(param.getRecordInfo().getFileSize()); + cloudRecordItem.setFilePath(param.getRecordInfo().getFilePath()); + cloudRecordItem.setMediaServerId(param.getMediaServer().getId()); + cloudRecordItem.setTimeLen((long) param.getRecordInfo().getTimeLen() * 1000); + cloudRecordItem.setEndTime((param.getRecordInfo().getStartTime() + (long)param.getRecordInfo().getTimeLen()) * 1000); return cloudRecordItem; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java index de4af6c5..6907e908 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java @@ -5,6 +5,8 @@ import com.alibaba.fastjson2.JSONObject; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaRecordMp4Event; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; @@ -24,6 +26,8 @@ import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.time.*; @@ -99,14 +103,15 @@ public class CloudRecordServiceImpl implements ICloudRecordService { return new ArrayList<>(resultSet); } - @Override - public void addRecord(OnRecordMp4HookParam param) { - CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(param); - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaRecordMp4Event event) { + CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(event); + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream()); if (streamAuthorityInfo != null) { cloudRecordItem.setCallId(streamAuthorityInfo.getCallId()); } - logger.info("[添加录像记录] {}/{} 文件大小:{}, 时长: {}秒", param.getApp(), param.getStream(), param.getFile_size(),param.getTime_len()); + logger.info("[添加录像记录] {}/{} 内容:{}", event.getApp(), event.getStream(), event.getRecordInfo()); cloudRecordServiceMapper.add(cloudRecordItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java index 360e29c0..6e009609 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java @@ -6,8 +6,8 @@ import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 3fcb9d08..126ea241 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -49,12 +49,6 @@ public class MediaServiceImpl implements IMediaService { @Autowired private IRedisCatchStorage redisCatchStorage; - @Autowired - private IMediaServerService mediaServerService; - - @Autowired - private MediaConfig mediaConfig; - @Autowired private IStreamProxyService streamProxyService; @@ -88,67 +82,6 @@ public class MediaServiceImpl implements IMediaService { @Autowired private ISIPCommander commander; - - - @Override - public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String callId) { - return getStreamInfoByAppAndStream(mediaServerItem, app, stream, mediaInfo, null, callId, true); - } - - @Override - public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr, boolean authority) { - StreamInfo streamInfo = null; - if (mediaServerId == null) { - mediaServerId = mediaConfig.getId(); - } - MediaServer mediaInfo = mediaServerService.getOne(mediaServerId); - if (mediaInfo == null) { - return null; - } - String calld = null; - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); - if (streamAuthorityInfo != null) { - calld = streamAuthorityInfo.getCallId(); - } - List streamInfoList = mediaServerService.getMediaList(mediaInfo, app, stream, calld); - if (streamInfoList.isEmpty()) { - return null; - }else { - return streamInfoList.get(0); - } - } - - - - @Override - public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, boolean authority) { - return getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, null, authority); - } - - @Override - public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServer, String app, String stream, MediaInfo mediaInfo, String addr, String callId, boolean isPlay) { - StreamInfo streamInfoResult = new StreamInfo(); - streamInfoResult.setStream(stream); - streamInfoResult.setApp(app); - if (addr == null) { - addr = mediaServer.getStreamIp(); - } - - streamInfoResult.setIp(addr); - streamInfoResult.setMediaServerId(mediaServer.getId()); - String callIdParam = ObjectUtils.isEmpty(callId)?"":"?callId=" + callId; - streamInfoResult.setRtmp(addr, mediaServer.getRtmpPort(),mediaServer.getRtmpSSlPort(), app, stream, callIdParam); - streamInfoResult.setRtsp(addr, mediaServer.getRtspPort(),mediaServer.getRtspSSLPort(), app, stream, callIdParam); - streamInfoResult.setFlv(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); - streamInfoResult.setFmp4(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); - streamInfoResult.setHls(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); - streamInfoResult.setTs(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam); - streamInfoResult.setRtc(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam, isPlay); - - streamInfoResult.setMediaInfo(mediaInfo); - return streamInfoResult; - } - @Override public boolean authenticatePlay(String app, String stream, String callId) { if (app == null || stream == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index cfb9abbc..48a8fbba 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,9 +1,7 @@ package com.genersoft.iot.vmp.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.common.InviteInfo; -import com.genersoft.iot.vmp.common.InviteSessionStatus; -import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; @@ -11,10 +9,12 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; -import com.genersoft.iot.vmp.media.event.MediaSendRtpStoppedEvent; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookData; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; @@ -72,7 +72,7 @@ public class PlatformServiceImpl implements IPlatformService { private IMediaServerService mediaServerService; @Autowired - private SIPCommanderFroPlatform commanderForPlatform; + private ISIPCommanderForPlatform commanderForPlatform; @Autowired private DynamicTask dynamicTask; @@ -520,11 +520,11 @@ public class PlatformServiceImpl implements IPlatformService { inviteStreamService.removeInviteInfo(inviteInfoForOld); }else { // 流确实尚在推流,直接回调结果 - OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam(); - hookParam.setApp(inviteInfoForOld.getStreamInfo().getApp()); - hookParam.setStream(inviteInfoForOld.getStreamInfo().getStream()); - - hookEvent.response(mediaServerItemForStreamInfo, hookParam); + HookData hookData = new HookData(); + hookData.setApp(inviteInfoForOld.getStreamInfo().getApp()); + hookData.setStream(inviteInfoForOld.getStreamInfo().getStream()); + hookData.setMediaServer(mediaServerItemForStreamInfo); + hookEvent.response(hookData); return; } } @@ -582,14 +582,14 @@ public class PlatformServiceImpl implements IPlatformService { } } }, userSetting.getPlayTimeout()); - commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{ + commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{ logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); dynamicTask.stop(timeOutTaskKey); // hook响应 - playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId); + playService.onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId); // 收到流 if (hookEvent != null) { - hookEvent.response(mediaServerItem, hookParam); + hookEvent.response(hookData); } }, event -> { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 1fad2be2..32af0575 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -17,16 +17,17 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.bean.MediaInfo; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; -import com.genersoft.iot.vmp.media.event.MediaNotFoundEvent; +import com.genersoft.iot.vmp.media.bean.RecordInfo; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookData; +import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForRecordMp4; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; @@ -99,9 +100,6 @@ public class PlayServiceImpl implements IPlayService { @Autowired private SendRtpPortManager sendRtpPortManager; - @Autowired - private IMediaService mediaService; - @Autowired private IMediaServerService mediaServerService; @@ -423,12 +421,12 @@ public class PlayServiceImpl implements IPlayService { // 查看设备是否已经在推流 try { - cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> { - logger.info("[语音对讲] 流已生成, 开始推流: " + hookParam); + cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (hookData) -> { + logger.info("[语音对讲] 流已生成, 开始推流: " + hookData); dynamicTask.stop(timeOutTaskKey); // TODO 暂不做处理 - }, (mediaServerItemInuse, hookParam) -> { - logger.info("[语音对讲] 设备开始推流: " + hookParam); + }, (hookData) -> { + logger.info("[语音对讲] 设备开始推流: " + hookData); dynamicTask.stop(timeOutTaskKey); }, (event) -> { @@ -538,8 +536,7 @@ public class PlayServiceImpl implements IPlayService { streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 取消订阅消息监听 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); + subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId())); } }else { logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}", @@ -554,11 +551,11 @@ public class PlayServiceImpl implements IPlayService { }, userSetting.getPlayTimeout()); try { - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (mediaServerItemInuse, hookParam ) -> { - logger.info("收到订阅消息: " + hookParam); + cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (hookData ) -> { + logger.info("收到订阅消息: " + hookData); dynamicTask.stop(timeOutTaskKey); // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channel.getChannelId()); + StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device.getDeviceId(), channel.getChannelId()); if (streamInfo == null){ callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); @@ -574,7 +571,7 @@ public class PlayServiceImpl implements IPlayService { streamInfo); logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification()); - snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); + snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); }, (eventResult) -> { // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getChannelId(), @@ -700,11 +697,10 @@ public class PlayServiceImpl implements IPlayService { mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); } - public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, HookParam hookParam, String deviceId, String channelId) { + public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) { StreamInfo streamInfo = null; Device device = redisCatchStorage.getDevice(deviceId); - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; - streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); + streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -722,9 +718,8 @@ public class PlayServiceImpl implements IPlayService { } - private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param; - StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); + private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) { + StreamInfo streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId); if (streamInfo != null) { streamInfo.setStartTime(startTime); streamInfo.setEndTime(endTime); @@ -733,7 +728,7 @@ public class PlayServiceImpl implements IPlayService { deviceChannel.setStreamId(streamInfo.getStream()); storager.startPlay(deviceId, channelId, streamInfo.getStream()); } - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, ((OnStreamChangedHookParam) param).getStream()); + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, mediaInfo.getStream()); if (inviteInfo != null) { inviteInfo.setStatus(InviteSessionStatus.ok); @@ -839,10 +834,10 @@ public class PlayServiceImpl implements IPlayService { inviteStreamService.removeInviteInfo(inviteInfo); }; - HookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { - logger.info("收到回放订阅消息: " + hookParam); + HookSubscribe.Event hookEvent = (hookData) -> { + logger.info("收到回放订阅消息: " + hookData); dynamicTask.stop(playBackTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime); + StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime); if (streamInfo == null) { logger.warn("设备回放API调用失败!"); callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -1028,10 +1023,10 @@ public class PlayServiceImpl implements IPlayService { streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); inviteStreamService.removeInviteInfo(inviteInfo); }; - HookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { - logger.info("[录像下载]收到订阅消息: " + hookParam); + HookSubscribe.Event hookEvent = (hookData) -> { + logger.info("[录像下载]收到订阅消息: " + hookData); dynamicTask.stop(downLoadTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime); + StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime); if (streamInfo == null) { logger.warn("[录像下载] 获取流地址信息失败"); callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -1049,26 +1044,24 @@ public class PlayServiceImpl implements IPlayService { downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD); // 注册录像回调事件,录像下载结束后写入下载地址 - HookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> { + HookSubscribe.Event hookEventForRecord = (hookData) -> { logger.info("[录像下载] 收到录像写入磁盘消息: , {}/{}-{}", inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream()); - logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookParam); - OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam; - String filePath = recordMp4HookParam.getFile_path(); + logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookData); + RecordInfo recordInfo = hookData.getRecordInfo(); + String filePath = recordInfo.getFilePath(); DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath); InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId() , inviteInfo.getChannelId(), inviteInfo.getStream()); inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo); inviteStreamService.updateInviteInfo(inviteInfoForNew); }; - HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4( - mediaServerItem.getId(), "rtp", ssrcInfo.getStream()); - + Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); // 设置过期时间,下载失败时自动处理订阅数据 // long difference = DateUtil.getDifference(startTime, endTime)/1000; // Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2)); // hookSubscribe.setExpires(expiresInstant); - subscribe.addSubscribe(hookSubscribe, hookEventForRecord); + subscribe.addSubscribe(hook, hookEventForRecord); }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[命令发送失败] 录像下载: {}", e.getMessage()); @@ -1134,9 +1127,8 @@ public class PlayServiceImpl implements IPlayService { return inviteInfo.getStreamInfo(); } - private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; - StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId); + private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) { + StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, mediaInfo, deviceId, channelId); if (streamInfo != null) { streamInfo.setProgress(0); streamInfo.setStartTime(startTime); @@ -1153,9 +1145,8 @@ public class PlayServiceImpl implements IPlayService { } - public StreamInfo onPublishHandler(MediaServer mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) { - MediaInfo mediaInfo = MediaInfo.getInstance(hookParam); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), mediaInfo, null); + public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) { + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null); streamInfo.setDeviceID(deviceId); streamInfo.setChannelId(channelId); return streamInfo; @@ -1220,7 +1211,7 @@ public class PlayServiceImpl implements IPlayService { AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); audioBroadcastResult.setApp(app); audioBroadcastResult.setStream(stream); - audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false))); + audioBroadcastResult.setStreamInfo(new StreamContent(mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false))); audioBroadcastResult.setCodec("G.711"); return audioBroadcastResult; } @@ -1591,7 +1582,7 @@ public class PlayServiceImpl implements IPlayService { } } - talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> { + talk(mediaServerItem, device, channelId, stream, (hookData) -> { logger.info("[语音对讲] 收到设备发来的流"); }, eventResult -> { logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index c768ae1a..2ac9de67 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -9,19 +9,18 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.bean.MediaInfo; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; -import com.genersoft.iot.vmp.media.event.MediaNotFoundEvent; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -65,9 +64,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private IVideoManagerStorage videoManagerStorager; - @Autowired - private IMediaService mediaService; - @Autowired private ZLMServerFactory zlmServerFactory; @@ -203,9 +199,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); return; } - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaServer.getId()); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + Hook hook = Hook.getInstance(HookType.on_media_arrival, param.getApp(), param.getStream(), mediaServer.getId()); + hookSubscribe.addSubscribe(hook, (hookData) -> { + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( mediaServer, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }); @@ -213,7 +209,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { String talkKey = UUID.randomUUID().toString(); String delayTalkKey = UUID.randomUUID().toString(); dynamicTask.startDelay(delayTalkKey, ()->{ - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaServer.getId(), false); + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaServer.getId(), false); if (streamInfo != null) { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }else { @@ -223,9 +219,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }, 7000); WVPResult result = addStreamProxyToZlm(param); if (result != null && result.getCode() == 0) { - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.removeSubscribe(hook); dynamicTask.stop(talkKey); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( mediaServer, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }else { @@ -244,7 +240,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } else{ - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( mediaServer, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 4bdcc2fe..b3c510d0 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -11,8 +11,8 @@ import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.bean.MediaInfo; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index d2d32e84..d14cea5c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -6,10 +6,10 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; @@ -61,9 +61,9 @@ public class RedisGbPlayMsgListener implements MessageListener { */ public static final int ERROR_CODE_TIMEOUT = -3; - private Map callbacks = new ConcurrentHashMap<>(); - private Map callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); - private Map callbacksForError = new ConcurrentHashMap<>(); + private final Map callbacks = new ConcurrentHashMap<>(); + private final Map callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); + private final Map callbacksForError = new ConcurrentHashMap<>(); @Autowired private UserSetting userSetting; @@ -89,7 +89,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @Autowired private HookSubscribe subscribe; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -297,9 +297,8 @@ public class RedisGbPlayMsgListener implements MessageListener { }, userSetting.getPlatformPlayTimeout()); // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); - - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{ + Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId()); + subscribe.addSubscribe(hook, (hookData)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 7edf7e85..9cb4d38b 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 25233123..3c0b749e 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java index 56d192e9..26e96857 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.conf.security.SecurityUtils; import com.genersoft.iot.vmp.conf.security.dto.LoginUser; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IStreamProxyService; @@ -35,11 +36,12 @@ public class MediaController { @Autowired private IRedisCatchStorage redisCatchStorage; - @Autowired - private IMediaService mediaService; @Autowired private IStreamProxyService streamProxyService; + @Autowired + private IMediaServerService mediaServerService; + /** * 根据应用名和流id获取播放地址 @@ -85,9 +87,9 @@ public class MediaController { String host = request.getHeader("Host"); String localAddr = host.split(":")[0]; logger.info("使用{}作为返回流的ip", localAddr); - streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority); + streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority); }else { - streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); + streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); } if (streamInfo != null){ @@ -105,9 +107,9 @@ public class MediaController { String host = request.getHeader("Host"); String localAddr = host.split(":")[0]; logger.info("使用{}作为返回流的ip", localAddr); - streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority); + streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority); }else { - streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); + streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); } if (streamInfo != null){ return new StreamContent(streamInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java index 3e1e9733..dc01c438 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -6,12 +6,11 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForRtpServerTimeout; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam; import com.genersoft.iot.vmp.media.service.IMediaServerService; @@ -108,12 +107,11 @@ public class PsController { } // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, - (mediaServerItemInUse, hookParam)->{ - OnRtpServerTimeoutHookParam serverTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam; - if (stream.equals(serverTimeoutHookParam.getStream_id())) { + hookSubscribe.addSubscribe(hook, + (hookData)->{ + if (stream.equals(hookData.getStream())) { logger.info("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId); // 将信息写入redis中,以备后用 redisTemplate.delete(receiveKey); @@ -126,7 +124,7 @@ public class PsController { } catch (IOException e) { logger.error("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); } - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + hookSubscribe.removeSubscribe(hook); } }); } @@ -241,18 +239,18 @@ public class PsController { }else { logger.info("[第三方PS服务对接->发送流] 流不存在,等待流上线,callId->{}", callId); String uuid = UUID.randomUUID().toString(); - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServerItem.getId()); dynamicTask.startDelay(uuid, ()->{ logger.info("[第三方PS服务对接->发送流] 等待流上线超时 callId->{}", callId); redisTemplate.delete(key); - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.removeSubscribe(hook); }, 10000); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 OtherPsSendInfo finalSendInfo = sendInfo; - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, - (mediaServerItemInUse, response)->{ + hookSubscribe.removeSubscribe(hook); + hookSubscribe.addSubscribe(hook, + (hookData)->{ dynamicTask.stop(uuid); logger.info("[第三方PS服务对接->发送流] 流上线,开始发流 callId->{}", callId); try { @@ -269,7 +267,7 @@ public class PsController { logger.info("[第三方PS服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg")); } - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.removeSubscribe(hook); }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index c810745f..96b0d490 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -6,12 +6,11 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForRtpServerTimeout; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam; import com.genersoft.iot.vmp.media.service.IMediaServerService; @@ -109,12 +108,11 @@ public class RtpController { } // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, - (mediaServerItemInUse, hookParam)->{ - OnRtpServerTimeoutHookParam serverTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam; - if (stream.equals(serverTimeoutHookParam.getStream_id())) { + hookSubscribe.addSubscribe(hook, + (hookData)->{ + if (stream.equals(hookData.getStream())) { logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId); OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); OkHttpClient client = httpClientBuilder.build(); @@ -125,7 +123,7 @@ public class RtpController { } catch (IOException e) { logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); } - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + hookSubscribe.removeSubscribe(hook); } }); } @@ -225,7 +223,7 @@ public class RtpController { if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) { throw new ControllerException(ErrorCode.ERROR400.getCode(), "至少应该存在一组音频或视频发送参数"); } - MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); + MediaServer mediaServer = mediaServerService.getDefaultMediaServer(); String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null) { @@ -278,10 +276,10 @@ public class RtpController { paramForVideo = null; } - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, app, stream); if (streamReady) { if (paramForVideo != null) { - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForVideo); if (jsonObject.getInteger("code") == 0) { logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo); redisTemplate.opsForValue().set(key, sendInfo); @@ -292,7 +290,7 @@ public class RtpController { } } if(paramForAudio != null) { - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForAudio); if (jsonObject.getInteger("code") == 0) { logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio); redisTemplate.opsForValue().set(key, sendInfo); @@ -305,18 +303,18 @@ public class RtpController { }else { logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId); String uuid = UUID.randomUUID().toString(); - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId()); dynamicTask.startDelay(uuid, ()->{ logger.info("[第三方服务对接->发送流] 等待流上线超时 callId->{}", callId); redisTemplate.delete(key); - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.removeSubscribe(hook); }, 10000); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 OtherRtpSendInfo finalSendInfo = sendInfo; - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, - (mediaServerItemInUse, response)->{ + hookSubscribe.removeSubscribe(hook); + hookSubscribe.addSubscribe(hook, + (hookData)->{ dynamicTask.stop(uuid); logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId); try { @@ -325,7 +323,7 @@ public class RtpController { throw new RuntimeException(e); } if (paramForVideo != null) { - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForVideo); if (jsonObject.getInteger("code") == 0) { logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo); redisTemplate.opsForValue().set(key, finalSendInfo); @@ -336,7 +334,7 @@ public class RtpController { } } if(paramForAudio != null) { - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForAudio); if (jsonObject.getInteger("code") == 0) { logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio); redisTemplate.opsForValue().set(key, finalSendInfo); @@ -346,7 +344,7 @@ public class RtpController { throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg")); } } - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.removeSubscribe(hook); }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java index dc31b5a7..780f8a73 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java @@ -9,10 +9,9 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.IHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; @@ -40,8 +39,6 @@ import java.util.List; @RequestMapping("/api/server") public class ServerController { - @Autowired - private HookSubscribe zlmHttpHookSubscribe; @Autowired private IMediaServerService mediaServerService; @@ -76,8 +73,6 @@ public class ServerController { @Autowired private IRedisCatchStorage redisCatchStorage; - @Autowired - private SendRtpPortManager sendRtpPortManager; @GetMapping(value = "/media_server/list") @@ -217,13 +212,6 @@ public class ServerController { return jsonObject; } - @GetMapping(value = "/hooks") - @ResponseBody - @Operation(summary = "获取当前所有hook") - public List getHooks() { - return zlmHttpHookSubscribe.getAll(); - } - @GetMapping(value = "/system/info") @ResponseBody @Operation(summary = "获取系统信息") diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index 459f3cc8..309cd694 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -249,7 +249,7 @@ public class StreamPushController { if (push != null && !push.isSelf()) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "来自其他平台的推流信息"); } - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); if (streamInfo == null){ throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取播放地址失败"); }