diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 702cc0de..71ceb2e3 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -253,123 +253,17 @@ public class ZLMHttpHookListener { */ @ResponseBody @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8") - public DeferredResult onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param) { + public HookResult onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param) { logger.info("[ZLM HOOK] 流未找到:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); - DeferredResult defaultResult = new DeferredResult<>(); MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId()); if (!userSetting.isAutoApplyPlay() || mediaServer == null) { - defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); - return defaultResult; + return HookResult.SUCCESS(); } MediaNotFoundEvent mediaNotFoundEvent = MediaNotFoundEvent.getInstance(this, param, mediaServer); applicationEventPublisher.publishEvent(mediaNotFoundEvent); - - - - if ("rtp".equals(param.getApp())) { - String[] s = param.getStream().split("_"); - if ((s.length != 2 && s.length != 4)) { - defaultResult.setResult(HookResult.SUCCESS()); - return defaultResult; - } - String deviceId = s[0]; - String channelId = s[1]; - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); - return defaultResult; - } - DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); - if (deviceChannel == null) { - defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); - return defaultResult; - } - if (s.length == 2) { - logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); - - RequestMessage msg = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; - boolean exist = resultHolder.exist(key, null); - msg.setKey(key); - String uuid = UUID.randomUUID().toString(); - msg.setId(uuid); - DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - - result.onTimeout(() -> { - logger.info("[ZLM HOOK] 预览流自动点播, 等待超时"); - msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); - resultHolder.invokeAllResult(msg); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); - storager.stopPlay(deviceId, channelId); - }); - - resultHolder.put(key, uuid, result); - - if (!exist) { - playService.play(mediaServer, deviceId, channelId, null, (code, message, data) -> { - msg.setData(new HookResult(code, message)); - resultHolder.invokeResult(msg); - }); - } - return result; - } else if (s.length == 4) { - // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间 - String startTimeStr = s[2]; - String endTimeStr = s[3]; - if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) { - defaultResult.setResult(HookResult.SUCCESS()); - return defaultResult; - } - String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr); - String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr); - logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}", - param.getMediaServerId(), param.getSchema(), - param.getApp(), param.getStream(), - startTime, endTime - ); - RequestMessage msg = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; - boolean exist = resultHolder.exist(key, null); - msg.setKey(key); - String uuid = UUID.randomUUID().toString(); - msg.setId(uuid); - DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - - result.onTimeout(() -> { - logger.info("[ZLM HOOK] 回放流自动点播, 等待超时"); - // 释放rtpserver - msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); - resultHolder.invokeResult(msg); - }); - - resultHolder.put(key, uuid, result); - - if (!exist) { - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, param.getStream(), null, - device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); - playService.playBack(mediaServer, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> { - msg.setData(new HookResult(code, message)); - resultHolder.invokeResult(msg); - }); - } - return result; - } else { - defaultResult.setResult(HookResult.SUCCESS()); - return defaultResult; - } - - } else { - // 拉流代理 - StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); - if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { - streamProxyService.start(param.getApp(), param.getStream()); - } - DeferredResult result = new DeferredResult<>(); - result.setResult(HookResult.SUCCESS()); - return result; - } + return HookResult.SUCCESS(); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 722139e9..31a43c9d 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -13,12 +13,15 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; @@ -28,6 +31,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResult; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; @@ -49,6 +53,7 @@ import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; +import org.springframework.web.context.request.async.DeferredResult; import javax.sdp.*; import javax.sip.InvalidArgumentException; @@ -195,6 +200,53 @@ public class PlayServiceImpl implements IPlayService { } } + /** + * 流未找到的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaNotFoundEvent event) { + if (!"rtp".equals(event.getApp())) { + return; + } + String[] s = event.getStream().split("_"); + if ((s.length != 2 && s.length != 4)) { + return; + } + String deviceId = s[0]; + String channelId = s[1]; + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || !device.isOnLine()) { + return; + } + DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); + if (deviceChannel == null) { + return; + } + if (s.length == 2) { + logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream()); + play(event.getMediaServer(), deviceId, channelId, null, null); + } else if (s.length == 4) { + // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间 + String startTimeStr = s[2]; + String endTimeStr = s[3]; + if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) { + return; + } + String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr); + String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr); + logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}", + event.getMediaServer().getId(), event.getSchema(), + event.getApp(), event.getStream(), + startTime, endTime + ); + + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null, + device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); + playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null); + } + } + @Override public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 02c5b30d..0713ab35 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; @@ -18,6 +19,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResult; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaService; @@ -44,6 +46,7 @@ import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; +import org.springframework.web.context.request.async.DeferredResult; import java.util.HashMap; import java.util.List; @@ -128,6 +131,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } + /** + * 流离开的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaNotFoundEvent event) { + if ("rtp".equals(event.getApp())) { + return; + } + // 拉流代理 + StreamProxyItem streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream()); + if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { + start(event.getApp(), event.getStream()); + } + } + @Override public void save(StreamProxyItem param, GeneralCallback callback) {