From 4c8e2beb4d369865893ec791430d95420d4acfad Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 8 Nov 2022 19:58:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96hook=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/media/zlm/ZLMHttpHookListener.java | 574 +++++++----------- .../vmp/media/zlm/ZLMMediaListManager.java | 13 +- .../media/zlm/dto/ServerKeepaliveData.java | 4 + .../media/zlm/dto/StreamAuthorityInfo.java | 15 +- .../iot/vmp/media/zlm/dto/StreamPushItem.java | 14 +- .../media/zlm/dto/{ => hook}/HookParam.java | 2 +- .../zlm/dto/{ => hook}/OnPlayHookParam.java | 6 +- .../dto/{ => hook}/OnPublishHookParam.java | 6 +- .../dto/hook/OnSendRtpStoppedHookParam.java | 27 + .../dto/hook/OnServerKeepaliveHookParam.java | 20 + .../OnStreamChangedHookParam.java} | 17 +- .../dto/hook/OnStreamNoneReaderHookParam.java | 41 ++ .../dto/hook/OnStreamNotFoundHookParam.java | 86 +++ .../media/zlm/dto/{ => hook}/OriginType.java | 2 +- .../iot/vmp/service/IMediaServerService.java | 3 +- .../iot/vmp/service/IStreamPushService.java | 6 +- .../service/impl/MediaServerServiceImpl.java | 18 +- .../vmp/service/impl/MediaServiceImpl.java | 2 - .../service/impl/StreamProxyServiceImpl.java | 21 +- .../service/impl/StreamPushServiceImpl.java | 41 +- .../redisMsg/RedisStreamMsgListener.java | 28 +- .../iot/vmp/storager/IRedisCatchStorage.java | 8 +- .../storager/impl/RedisCatchStorageImpl.java | 22 +- .../components/console/ConsoleResource.vue | 12 +- 24 files changed, 513 insertions(+), 475 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ServerKeepaliveData.java rename src/main/java/com/genersoft/iot/vmp/media/zlm/dto/{ => hook}/HookParam.java (85%) rename src/main/java/com/genersoft/iot/vmp/media/zlm/dto/{ => hook}/OnPlayHookParam.java (87%) rename src/main/java/com/genersoft/iot/vmp/media/zlm/dto/{ => hook}/OnPublishHookParam.java (87%) create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnSendRtpStoppedHookParam.java create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnServerKeepaliveHookParam.java rename src/main/java/com/genersoft/iot/vmp/media/zlm/dto/{MediaItem.java => hook/OnStreamChangedHookParam.java} (96%) create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNoneReaderHookParam.java create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNotFoundHookParam.java rename src/main/java/com/genersoft/iot/vmp/media/zlm/dto/{ => hook}/OriginType.java (89%) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 41c17ca4..6392ada2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -108,17 +109,20 @@ public class ZLMHttpHookListener { */ @ResponseBody @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") - public JSONObject onServerKeepalive(@RequestBody JSONObject json){ + public JSONObject onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param){ - logger.info("[ ZLM HOOK ]on_server_keepalive API调用,参数:" + json.toString()); - String mediaServerId = json.getString("mediaServerId"); - List subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); - if (subscribes != null && subscribes.size() > 0) { - for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, json); + logger.info("[ZLM HOOK] 收到zlm心跳:" + param.getMediaServerId()); + + taskExecutor.execute(()->{ + List subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); + JSONObject json = (JSONObject) JSON.toJSON(param); + if (subscribes != null && subscribes.size() > 0) { + for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { + subscribe.response(null, json); + } } - } - mediaServerService.updateMediaServerKeepalive(mediaServerId, json.getJSONObject("data")); + }); + mediaServerService.updateMediaServerKeepalive(param.getMediaServerId(), param.getData()); JSONObject ret = new JSONObject(); ret.put("code", 0); @@ -126,43 +130,6 @@ public class ZLMHttpHookListener { return ret; } - - /** - * 流量统计事件,播放器或推流器断开时并且耗用流量超过特定阈值时会触发此事件,阈值通过配置文件general.flowThreshold配置;此事件对回复不敏感。 - * - */ - @ResponseBody - @PostMapping(value = "/on_flow_report", produces = "application/json;charset=UTF-8") - public JSONObject onFlowReport(@RequestBody JSONObject json){ - - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_flow_report API调用,参数:" + json.toString()); - } - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("msg", "success"); - return ret; - } - - /** - * 访问http文件服务器上hls之外的文件时触发。 - * - */ - @ResponseBody - @PostMapping(value = "/on_http_access", produces = "application/json;charset=UTF-8") - public JSONObject onHttpAccess(@RequestBody JSONObject json){ - - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_http_access API 调用,参数:" + json.toString()); - } - String mediaServerId = json.getString("mediaServerId"); - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("err", ""); - ret.put("path", ""); - ret.put("second", 600); - return ret; - } /** * 播放器鉴权事件,rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。 @@ -171,20 +138,21 @@ public class ZLMHttpHookListener { @ResponseBody @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8") public JSONObject onPlay(@RequestBody OnPlayHookParam param){ - - JSONObject json = (JSONObject)JSON.toJSON(param); - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param)); + logger.debug("[ZLM HOOK] 播放鉴权:{}->{}" + param.getMediaServerId(), param); } String mediaServerId = param.getMediaServerId(); - ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json); - if (subscribe != null ) { - MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - if (mediaInfo != null) { - subscribe.response(mediaInfo, json); + + taskExecutor.execute(()->{ + JSONObject json = (JSONObject) JSON.toJSON(param); + ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json); + if (subscribe != null ) { + MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); + if (mediaInfo != null) { + subscribe.response(mediaInfo, json); + } } - } + }); JSONObject ret = new JSONObject(); if (!"rtp".equals(param.getApp())) { Map paramMap = urlParamToMap(param.getParams()); @@ -211,7 +179,7 @@ public class ZLMHttpHookListener { JSONObject json = (JSONObject) JSON.toJSON(param); - logger.info("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString()); + logger.info("[ZLM HOOK]推流鉴权:{}->{}", param.getMediaServerId(), param); JSONObject ret = new JSONObject(); String mediaServerId = json.getString("mediaServerId"); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); @@ -258,21 +226,23 @@ public class ZLMHttpHookListener { ret.put("code", 0); ret.put("msg", "success"); - ret.put("enable_hls", true); + ret.put("enable_hls", false); + if (!"rtp".equals(param.getApp())) { ret.put("enable_audio", true); } - - ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); - if (subscribe != null) { - if (mediaInfo != null) { - subscribe.response(mediaInfo, json); - }else { - ret.put("code", 1); - ret.put("msg", "zlm not register"); + taskExecutor.execute(()->{ + ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); + if (subscribe != null) { + if (mediaInfo != null) { + subscribe.response(mediaInfo, json); + }else { + ret.put("code", 1); + ret.put("msg", "zlm not register"); + } } - } + }); if ("rtp".equals(param.getApp())) { ret.put("enable_mp4", userSetting.getRecordSip()); @@ -292,113 +262,10 @@ public class ZLMHttpHookListener { ret.put("mp4_max_second", 10); ret.put("enable_mp4", true); ret.put("enable_audio", true); - } } return ret; } - - - - /** - * 录制mp4完成后通知事件;此事件对回复不敏感。 - * - */ - @ResponseBody - @PostMapping(value = "/on_record_mp4", produces = "application/json;charset=UTF-8") - public JSONObject onRecordMp4(@RequestBody JSONObject json){ - - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_record_mp4 API调用,参数:" + json.toString()); - } - String mediaServerId = json.getString("mediaServerId"); - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("msg", "success"); - return ret; - } - /** - * 录制hls完成后通知事件;此事件对回复不敏感。 - * - */ - @ResponseBody - @PostMapping(value = "/on_record_ts", produces = "application/json;charset=UTF-8") - public JSONObject onRecordTs(@RequestBody JSONObject json){ - - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_record_ts API调用,参数:" + json.toString()); - } - String mediaServerId = json.getString("mediaServerId"); - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("msg", "success"); - return ret; - } - - /** - * rtsp专用的鉴权事件,先触发on_rtsp_realm事件然后才会触发on_rtsp_auth事件。 - * - */ - @ResponseBody - @PostMapping(value = "/on_rtsp_realm", produces = "application/json;charset=UTF-8") - public JSONObject onRtspRealm(@RequestBody JSONObject json){ - - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_rtsp_realm API调用,参数:" + json.toString()); - } - String mediaServerId = json.getString("mediaServerId"); - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("realm", ""); - return ret; - } - - - /** - * 该rtsp流是否开启rtsp专用方式的鉴权事件,开启后才会触发on_rtsp_auth事件。需要指出的是rtsp也支持url参数鉴权,它支持两种方式鉴权。 - * - */ - @ResponseBody - @PostMapping(value = "/on_rtsp_auth", produces = "application/json;charset=UTF-8") - public JSONObject onRtspAuth(@RequestBody JSONObject json){ - - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_rtsp_auth API调用,参数:" + json.toString()); - } - String mediaServerId = json.getString("mediaServerId"); - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("encrypted", false); - ret.put("passwd", "test"); - return ret; - } - - /** - * shell登录鉴权,ZLMediaKit提供简单的telnet调试方式,使用telnet 127.0.0.1 9000能进入MediaServer进程的shell界面。 - * - */ - @ResponseBody - @PostMapping(value = "/on_shell_login", produces = "application/json;charset=UTF-8") - public JSONObject onShellLogin(@RequestBody JSONObject json){ - - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString()); - } - String mediaServerId = json.getString("mediaServerId"); - ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json); - if (subscribe != null ) { - MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - if (mediaInfo != null) { - subscribe.response(mediaInfo, json); - } - - } - - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("msg", "success"); - return ret; - } /** * rtsp/rtmp流注册或注销时触发此事件;此事件对回复不敏感。 @@ -406,137 +273,139 @@ public class ZLMHttpHookListener { */ @ResponseBody @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8") - public JSONObject onStreamChanged(@RequestBody MediaItem item){ + public JSONObject onStreamChanged(@RequestBody OnStreamChangedHookParam param){ - logger.info("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item)); - String mediaServerId = item.getMediaServerId(); - JSONObject json = (JSONObject) JSON.toJSON(item); - ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); - if (subscribe != null ) { - MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - if (mediaInfo != null) { - subscribe.response(mediaInfo, json); - } - } - // 流消失移除redis play - String app = item.getApp(); - String stream = item.getStream(); - String schema = item.getSchema(); - List tracks = item.getTracks(); - boolean regist = item.isRegist(); - if (regist) { - if (item.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); - if (streamAuthorityInfo == null) { - streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item); - }else { - streamAuthorityInfo.setOriginType(item.getOriginType()); - streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr()); - } - redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo); - } + if (param.isRegist()) { + logger.info("[ZLM HOOK] 流注册, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); }else { - redisCatchStorage.removeStreamAuthorityInfo(app, stream); + logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); } - if ("rtsp".equals(schema)){ - logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", regist, app, stream); - if (regist) { - mediaServerService.addCount(mediaServerId); - }else { - mediaServerService.removeCount(mediaServerId); - } - if (item.getOriginType() == OriginType.PULL.ordinal() - || item.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) { - // 设置拉流代理上线/离线 - streamProxyService.updateStatus(regist, app, stream); - } - if ("rtp".equals(app) && !regist ) { - StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(stream); - if (streamInfo!=null){ - redisCatchStorage.stopPlay(streamInfo); - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); - }else{ - streamInfo = redisCatchStorage.queryPlayback(null, null, stream, null); - if (streamInfo != null) { - redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(), - streamInfo.getStream(), null); - } - } - }else { - if (!"rtp".equals(app)){ - String type = OriginType.values()[item.getOriginType()].getType(); - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem != null){ - if (regist) { - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); - String callId = null; - if (streamAuthorityInfo != null) { - callId = streamAuthorityInfo.getCallId(); - } - StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, - app, stream, tracks, callId); - item.setStreamInfo(streamInfoByAppAndStream); - redisCatchStorage.addStream(mediaServerItem, type, app, stream, item); - if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - item.setSeverId(userSetting.getServerId()); - zlmMediaListManager.addPush(item); - } - }else { - // 兼容流注销时类型从redis记录获取 - MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, stream, mediaServerId); - if (mediaItem != null) { - type = OriginType.values()[mediaItem.getOriginType()].getType(); - redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, stream); - } - GbStream gbStream = storager.getGbStream(app, stream); - if (gbStream != null) { -// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } - zlmMediaListManager.removeMedia(app, stream); - } - if (type != null) { - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", app); - jsonObject.put("stream", stream); - jsonObject.put("register", regist); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - } - } + JSONObject json = (JSONObject) JSON.toJSON(param); + taskExecutor.execute(()->{ + ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); + if (subscribe != null ) { + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (mediaInfo != null) { + subscribe.response(mediaInfo, json); } } - if (!regist) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(stream); - if (sendRtpItems.size() > 0) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - if (sendRtpItem.getApp().equals(app)) { - String platformId = sendRtpItem.getPlatformId(); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); - Device device = deviceService.getDevice(platformId); + // 流消失移除redis play + List tracks = param.getTracks(); + if (param.isRegist()) { + if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - try { - if (platform != null) { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - }else { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), stream, sendRtpItem.getCallId()); + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); + if (streamAuthorityInfo == null) { + streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); + }else { + streamAuthorityInfo.setOriginType(param.getOriginType()); + streamAuthorityInfo.setOriginTypeStr(param.getOriginTypeStr()); + } + redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); + } + }else { + redisCatchStorage.removeStreamAuthorityInfo(param.getApp(), param.getStream()); + } + + if ("rtsp".equals(param.getSchema())){ + if (param.isRegist()) { + mediaServerService.addCount(param.getMediaServerId()); + }else { + mediaServerService.removeCount(param.getMediaServerId()); + } + if (param.getOriginType() == OriginType.PULL.ordinal() + || param.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) { + // 设置拉流代理上线/离线 + streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + } + if ("rtp".equals(param.getApp()) && !param.isRegist() ) { + StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(param.getStream()); + if (streamInfo!=null){ + redisCatchStorage.stopPlay(streamInfo); + storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + }else{ + streamInfo = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); + if (streamInfo != null) { + redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(), + streamInfo.getStream(), null); + } + } + }else { + if (!"rtp".equals(param.getApp())){ + String type = OriginType.values()[param.getOriginType()].getType(); + MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); + + if (mediaServerItem != null){ + if (param.isRegist()) { + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); + String callId = null; + if (streamAuthorityInfo != null) { + callId = streamAuthorityInfo.getCallId(); + } + StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, + param.getApp(), param.getStream(), tracks, callId); + param.setStreamInfo(streamInfoByAppAndStream); + redisCatchStorage.addStream(mediaServerItem, type, param.getApp(), param.getStream(), param); + if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { + param.setSeverId(userSetting.getServerId()); + zlmMediaListManager.addPush(param); + } + }else { + // 兼容流注销时类型从redis记录获取 + OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(param.getApp(), param.getStream(), param.getMediaServerId()); + if (onStreamChangedHookParam != null) { + type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaServerItem.getId(), type, param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { +// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + } + zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); + } + if (type != null) { + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", param.getApp()); + jsonObject.put("stream", param.getStream()); + jsonObject.put("register", param.isRegist()); + jsonObject.put("mediaServerId", param.getMediaServerId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } + } + } + } + if (!param.isRegist()) { + List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (sendRtpItem.getApp().equals(param.getApp())) { + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.getDevice(platformId); + + try { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + }else { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + } + } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } - } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } } } } } - } + }); JSONObject ret = new JSONObject(); ret.put("code", 0); @@ -550,19 +419,16 @@ public class ZLMHttpHookListener { */ @ResponseBody @PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8") - public JSONObject onStreamNoneReader(@RequestBody JSONObject json){ + public JSONObject onStreamNoneReader(@RequestBody OnStreamNoneReaderHookParam param){ - logger.info("[ ZLM HOOK ]on_stream_none_reader API调用,参数:" + json.toString()); - String mediaServerId = json.getString("mediaServerId"); - String streamId = json.getString("stream"); - String app = json.getString("app"); + logger.info("[ZLM HOOK]流无人观看:{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); ret.put("code", 0); // 录像下载 - ret.put("close", userSetting.getStreamOnDemand()); - if ("rtp".equals(app)){ + if ("rtp".equals(param.getApp())){ + ret.put("close", userSetting.getStreamOnDemand()); // 国标流, 点播/录像回放/录像下载 - StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId); + StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(param.getStream()); // 点播 if (streamInfoForPlayCatch != null) { // 收到无人观看说明流也没有在往上级推送 @@ -596,7 +462,7 @@ public class ZLMHttpHookListener { return ret; } // 录像回放 - StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null); + StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); if (streamInfoForPlayBackCatch != null ) { if (streamInfoForPlayBackCatch.isPause()) { ret.put("close", false); @@ -617,7 +483,7 @@ public class ZLMHttpHookListener { return ret; } // 录像下载 - StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, streamId, null); + StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, param.getStream(), null); // 进行录像下载时无人观看不断流 if (streamInfoForDownload != null) { ret.put("close", false); @@ -626,19 +492,19 @@ public class ZLMHttpHookListener { }else { // 非国标流 推流/拉流代理 // 拉流代理 - StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId); + StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyItem != null ) { if (streamProxyItem.isEnable_remove_none_reader()) { // 无人观看自动移除 ret.put("close", true); - streamProxyService.del(app, streamId); + streamProxyService.del(param.getApp(), param.getStream()); String url = streamProxyItem.getUrl() != null?streamProxyItem.getUrl():streamProxyItem.getSrc_url(); - logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, streamId, url); + logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", param.getApp(), param.getStream(), url); }else if (streamProxyItem.isEnable_disable_none_reader()) { // 无人观看停用 ret.put("close", true); // 修改数据 - streamProxyService.stop(app, streamId); + streamProxyService.stop(param.getApp(), param.getStream()); }else { ret.put("close", false); } @@ -660,35 +526,33 @@ public class ZLMHttpHookListener { */ @ResponseBody @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8") - public JSONObject onStreamNotFound(@RequestBody JSONObject json){ - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_stream_not_found API调用,参数:" + json.toString()); - } - String mediaServerId = json.getString("mediaServerId"); - MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - if (userSetting.isAutoApplyPlay() && mediaInfo != null) { - String app = json.getString("app"); - String streamId = json.getString("stream"); - if ("rtp".equals(app)) { - if (mediaInfo.isRtpEnable()) { - String[] s = streamId.split("_"); - if (s.length == 2) { - String deviceId = s[0]; - String channelId = s[1]; - Device device = redisCatchStorage.getDevice(deviceId); - if (device != null) { - playService.play(mediaInfo,deviceId, channelId, null, null, null); + public JSONObject onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param){ + logger.info("[ZLM HOOK] 流未找到:{}->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + taskExecutor.execute(()->{ + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (userSetting.isAutoApplyPlay() && mediaInfo != null) { + if ("rtp".equals(param.getApp())) { + if (mediaInfo.isRtpEnable()) { + String[] s = param.getStream().split("_"); + if (s.length == 2) { + String deviceId = s[0]; + String channelId = s[1]; + Device device = redisCatchStorage.getDevice(deviceId); + if (device != null) { + playService.play(mediaInfo,deviceId, channelId, null, null, null); + } } } - } - }else { - // 拉流代理 - StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId); - if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnable_disable_none_reader()) { - streamProxyService.start(app, streamId); + }else { + // 拉流代理 + StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnable_disable_none_reader()) { + streamProxyService.start(param.getApp(), param.getStream()); + } } } - } + }); + JSONObject ret = new JSONObject(); ret.put("code", 0); @@ -704,22 +568,20 @@ public class ZLMHttpHookListener { @PostMapping(value = "/on_server_started", produces = "application/json;charset=UTF-8") public JSONObject onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){ - if (logger.isDebugEnabled()) { - logger.debug("[ ZLM HOOK ]on_server_started API调用,参数:" + jsonObject.toString()); - } - String remoteAddr = request.getRemoteAddr(); - jsonObject.put("ip", remoteAddr); - List subscribes = this.subscribe.getSubscribes(HookType.on_server_started); - if (subscribes != null && subscribes.size() > 0) { - for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, jsonObject); + jsonObject.put("ip", request.getRemoteAddr()); + ZLMServerConfig zlmServerConfig = JSON.to(ZLMServerConfig.class, jsonObject); + zlmServerConfig.setIp(request.getRemoteAddr()); + logger.info("[ZLM HOOK] zlm 启动 " + zlmServerConfig.getGeneralMediaServerId()); + taskExecutor.execute(()->{ + List subscribes = this.subscribe.getSubscribes(HookType.on_server_started); + if (subscribes != null && subscribes.size() > 0) { + for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { + subscribe.response(null, jsonObject); + } } - } - - ZLMServerConfig zlmServerConfig = jsonObject.to(ZLMServerConfig.class); - if (zlmServerConfig !=null ) { mediaServerService.zlmServerOnline(zlmServerConfig); - } + }); + JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); @@ -731,33 +593,33 @@ public class ZLMHttpHookListener { */ @ResponseBody @PostMapping(value = "/on_send_rtp_stopped", produces = "application/json;charset=UTF-8") - public JSONObject onSendRtpStopped(HttpServletRequest request, @RequestBody JSONObject jsonObject){ + public JSONObject onSendRtpStopped(HttpServletRequest request, @RequestBody OnSendRtpStoppedHookParam param){ - logger.info("[ ZLM HOOK ]on_send_rtp_stopped API调用,参数:" + jsonObject); + logger.info("[ZLM HOOK] 发送rtp被动关闭:{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); // 查找对应的上级推流,发送停止 - String app = jsonObject.getString("app"); - if (!"rtp".equals(app)) { + if (!"rtp".equals(param.getApp())) { return ret; } - String stream = jsonObject.getString("stream"); - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(stream); - if (sendRtpItems.size() > 0) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); - try { - commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + taskExecutor.execute(()->{ + List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + try { + commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStreamId()); } - redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStreamId()); } - } + }); return ret; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index d6159809..db2beb0c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; @@ -67,19 +68,19 @@ public class ZLMMediaListManager { private Map channelOnPublishEvents = new ConcurrentHashMap<>(); - public StreamPushItem addPush(MediaItem mediaItem) { - StreamPushItem transform = streamPushService.transform(mediaItem); - StreamPushItem pushInDb = streamPushService.getPush(mediaItem.getApp(), mediaItem.getStream()); - transform.setPushIng(mediaItem.isRegist()); + public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) { + StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam); + StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); + transform.setPushIng(onStreamChangedHookParam.isRegist()); transform.setUpdateTime(DateUtil.getNow()); transform.setPushTime(DateUtil.getNow()); - transform.setSelf(userSetting.getServerId().equals(mediaItem.getSeverId())); + transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId())); if (pushInDb == null) { transform.setCreateTime(DateUtil.getNow()); streamPushMapper.add(transform); }else { streamPushMapper.update(transform); - gbStreamMapper.updateMediaServer(mediaItem.getApp(), mediaItem.getStream(), mediaItem.getMediaServerId()); + gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId()); } ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); if ( channelOnlineEventLister != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ServerKeepaliveData.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ServerKeepaliveData.java new file mode 100644 index 00000000..0cc81f27 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ServerKeepaliveData.java @@ -0,0 +1,4 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +public class ServerKeepaliveData { +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java index 36a03630..ef77225d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java @@ -1,5 +1,8 @@ package com.genersoft.iot.vmp.media.zlm.dto; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnPublishHookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; + /** * 流的鉴权信息 * @author lin @@ -102,13 +105,13 @@ public class StreamAuthorityInfo { return streamAuthorityInfo; } - public static StreamAuthorityInfo getInstanceByHook(MediaItem mediaItem) { + public static StreamAuthorityInfo getInstanceByHook(OnStreamChangedHookParam onStreamChangedHookParam) { StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo(); - streamAuthorityInfo.setApp(mediaItem.getApp()); - streamAuthorityInfo.setStream(mediaItem.getStream()); - streamAuthorityInfo.setId(mediaItem.getMediaServerId()); - streamAuthorityInfo.setOriginType(mediaItem.getOriginType()); - streamAuthorityInfo.setOriginTypeStr(mediaItem.getOriginTypeStr()); + streamAuthorityInfo.setApp(onStreamChangedHookParam.getApp()); + streamAuthorityInfo.setStream(onStreamChangedHookParam.getStream()); + streamAuthorityInfo.setId(onStreamChangedHookParam.getMediaServerId()); + streamAuthorityInfo.setOriginType(onStreamChangedHookParam.getOriginType()); + streamAuthorityInfo.setOriginTypeStr(onStreamChangedHookParam.getOriginTypeStr()); return streamAuthorityInfo; } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java index 1a73a205..ddcfbdd9 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java @@ -1,10 +1,10 @@ package com.genersoft.iot.vmp.media.zlm.dto; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.utils.DateUtil; import io.swagger.v3.oas.annotations.media.Schema; import org.jetbrains.annotations.NotNull; -import org.springframework.util.unit.DataUnit; import java.util.List; @@ -59,7 +59,7 @@ public class StreamPushItem extends GbStream implements Comparable tracks; + private List tracks; /** * 音视频轨道 @@ -223,11 +223,11 @@ public class StreamPushItem extends GbStream implements Comparable getTracks() { + public List getTracks() { return tracks; } - public void setTracks(List tracks) { + public void setTracks(List tracks) { this.tracks = tracks; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookParam.java similarity index 85% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookParam.java rename to src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookParam.java index 50e37234..46ccf227 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookParam.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.zlm.dto; +package com.genersoft.iot.vmp.media.zlm.dto.hook; /** * zlm hook事件的参数 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPlayHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPlayHookParam.java similarity index 87% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPlayHookParam.java rename to src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPlayHookParam.java index 92ecb47c..4d2c26fd 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPlayHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPlayHookParam.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.zlm.dto; +package com.genersoft.iot.vmp.media.zlm.dto.hook; /** * zlm hook事件中的on_play事件的参数 @@ -79,4 +79,8 @@ public class OnPlayHookParam extends HookParam{ this.vhost = vhost; } + @Override + public String toString() { + return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPublishHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPublishHookParam.java similarity index 87% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPublishHookParam.java rename to src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPublishHookParam.java index 354c1195..e353163f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPublishHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPublishHookParam.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.zlm.dto; +package com.genersoft.iot.vmp.media.zlm.dto.hook; /** * zlm hook事件中的on_publish事件的参数 @@ -79,4 +79,8 @@ public class OnPublishHookParam extends HookParam{ this.vhost = vhost; } + @Override + public String toString() { + return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnSendRtpStoppedHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnSendRtpStoppedHookParam.java new file mode 100644 index 00000000..bbdada9d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnSendRtpStoppedHookParam.java @@ -0,0 +1,27 @@ +package com.genersoft.iot.vmp.media.zlm.dto.hook; + +/** + * zlm hook事件中的on_send_rtp_stopped事件的参数 + * @author lin + */ +public class OnSendRtpStoppedHookParam extends HookParam{ + private String app; + private String stream; + + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnServerKeepaliveHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnServerKeepaliveHookParam.java new file mode 100644 index 00000000..23360560 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnServerKeepaliveHookParam.java @@ -0,0 +1,20 @@ +package com.genersoft.iot.vmp.media.zlm.dto.hook; + +import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; + +/** + * zlm hook事件中的on_play事件的参数 + * @author lin + */ +public class OnServerKeepaliveHookParam extends HookParam{ + + private ServerKeepaliveData data; + + public ServerKeepaliveData getData() { + return data; + } + + public void setData(ServerKeepaliveData data) { + this.data = data; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamChangedHookParam.java similarity index 96% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java rename to src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamChangedHookParam.java index 96cbfbd8..29f91c8b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamChangedHookParam.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.zlm.dto; +package com.genersoft.iot.vmp.media.zlm.dto.hook; import com.genersoft.iot.vmp.common.StreamInfo; @@ -7,7 +7,7 @@ import java.util.List; /** * @author lin */ -public class MediaItem { +public class OnStreamChangedHookParam extends HookParam{ /** * 注册/注销 @@ -68,11 +68,6 @@ public class MediaItem { */ private String originUrl; - /** - * 流媒体服务器id - */ - private String mediaServerId; - /** * 服务器id */ @@ -412,14 +407,6 @@ public class MediaItem { this.docker = docker; } - public String getMediaServerId() { - return mediaServerId; - } - - public void setMediaServerId(String mediaServerId) { - this.mediaServerId = mediaServerId; - } - public StreamInfo getStreamInfo() { return streamInfo; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNoneReaderHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNoneReaderHookParam.java new file mode 100644 index 00000000..0282ee59 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNoneReaderHookParam.java @@ -0,0 +1,41 @@ +package com.genersoft.iot.vmp.media.zlm.dto.hook; + +public class OnStreamNoneReaderHookParam extends HookParam{ + + private String schema; + private String app; + private String stream; + private String vhost; + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getVhost() { + return vhost; + } + + public void setVhost(String vhost) { + this.vhost = vhost; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNotFoundHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNotFoundHookParam.java new file mode 100644 index 00000000..20fdf823 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNotFoundHookParam.java @@ -0,0 +1,86 @@ +package com.genersoft.iot.vmp.media.zlm.dto.hook; + +/** + * zlm hook事件中的on_stream_not_found事件的参数 + * @author lin + */ +public class OnStreamNotFoundHookParam extends HookParam{ + private String id; + private String app; + private String stream; + private String ip; + private String params; + private int port; + private String schema; + private String vhost; + + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public String getParams() { + return params; + } + + public void setParams(String params) { + this.params = params; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public String getVhost() { + return vhost; + } + + public void setVhost(String vhost) { + this.vhost = vhost; + } + + @Override + public String toString() { + return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OriginType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OriginType.java similarity index 89% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OriginType.java rename to src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OriginType.java index 630e8257..926cf4d4 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OriginType.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OriginType.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.zlm.dto; +package com.genersoft.iot.vmp.media.zlm.dto.hook; public enum OriginType { // 不可调整顺序 diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 1a463cf6..f8d58695 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -86,7 +87,7 @@ public interface IMediaServerService { MediaServerItem getDefaultMediaServer(); - void updateMediaServerKeepalive(String mediaServerId, JSONObject data); + void updateMediaServerKeepalive(String mediaServerId, ServerKeepaliveData data); boolean checkRtpServer(MediaServerItem mediaServerItem, String rtp, String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 4bccc3f0..8885ed51 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -1,13 +1,11 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; -import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageInfo; import java.util.List; @@ -38,7 +36,7 @@ public interface IStreamPushService { List getPushList(String mediaSererId); - StreamPushItem transform(MediaItem item); + StreamPushItem transform(OnStreamChangedHookParam item); StreamPushItem getPush(String app, String streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 66346082..1f53c7f4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -10,6 +10,7 @@ import java.util.Set; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -434,7 +435,7 @@ public class MediaServerServiceImpl implements IMediaServerService { if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) { logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息,请检查zlm是否可以正常向wvp发送心跳", serverItem.getId()); // 添加zlm信息 - updateMediaServerKeepalive(serverItem.getId(), mediaServerConfig); + updateMediaServerKeepalive(serverItem.getId(), null); }else { publisher.zlmOfflineEventPublish(serverItem.getId()); } @@ -526,15 +527,15 @@ public class MediaServerServiceImpl implements IMediaServerService { Map param = new HashMap<>(); param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline param.put("hook.enable","1"); - param.put("hook.on_flow_report",String.format("%s/on_flow_report", hookPrex)); + param.put("hook.on_flow_report",""); param.put("hook.on_play",String.format("%s/on_play", hookPrex)); - param.put("hook.on_http_access",String.format("%s/on_http_access", hookPrex)); + param.put("hook.on_http_access",""); param.put("hook.on_publish", String.format("%s/on_publish", hookPrex)); - param.put("hook.on_record_ts",String.format("%s/on_record_ts", hookPrex)); - param.put("hook.on_rtsp_auth",String.format("%s/on_rtsp_auth", hookPrex)); - param.put("hook.on_rtsp_realm",String.format("%s/on_rtsp_realm", hookPrex)); + param.put("hook.on_record_ts",""); + param.put("hook.on_rtsp_auth",""); + param.put("hook.on_rtsp_realm",""); param.put("hook.on_server_started",String.format("%s/on_server_started", hookPrex)); - param.put("hook.on_shell_login",String.format("%s/on_shell_login", hookPrex)); + param.put("hook.on_shell_login",""); param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrex)); param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex)); param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex)); @@ -551,6 +552,7 @@ public class MediaServerServiceImpl implements IMediaServerService { // 此参数不应大于播放器超时时间 // 优化此消息以更快的收到流注销事件 param.put("general.continue_push_ms", "3000" ); + param.put("general.publishToHls", "0" ); // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流, // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 // param.put("general.wait_track_ready_ms", "3000" ); @@ -645,7 +647,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) { + public void updateMediaServerKeepalive(String mediaServerId, ServerKeepaliveData data) { MediaServerItem mediaServerItem = getOne(mediaServerId); if (mediaServerItem == null) { // 缓存不存在,从数据库查询,如果数据库不存在则是错误的 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index bfe8f045..e3a00188 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -16,7 +15,6 @@ import com.genersoft.iot.vmp.service.IMediaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; @Service public class MediaServiceImpl implements IMediaService { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index d8c7f484..3183e3de 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -6,12 +6,10 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.TreeType; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IGbStreamService; @@ -27,7 +25,6 @@ import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +34,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; -import java.net.InetAddress; import java.util.*; /** @@ -389,18 +384,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService { String type = "PULL"; // 发送redis消息 - List mediaItems = redisCatchStorage.getStreams(mediaServerId, type); - if (mediaItems.size() > 0) { - for (MediaItem mediaItem : mediaItems) { + List onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, type); + if (onStreamChangedHookParams.size() > 0) { + for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", mediaItem.getApp()); - jsonObject.put("stream", mediaItem.getStream()); + jsonObject.put("app", onStreamChangedHookParam.getApp()); + jsonObject.put("stream", onStreamChangedHookParam.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); + redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); } } } @@ -418,7 +413,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { private void syncPullStream(String mediaServerId){ MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId); if (mediaServer != null) { - List allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL"); + List allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL"); if (allPullStream.size() > 0) { zlmresTfulUtils.getMediaList(mediaServer, jsonObject->{ Map stringStreamInfoMap = new HashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index bd2d5357..ffbcb42b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -11,6 +11,8 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; @@ -29,7 +31,6 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import java.util.*; import java.util.stream.Collectors; @@ -93,8 +94,8 @@ public class StreamPushServiceImpl implements IStreamPushService { Map result = new HashMap<>(); - List mediaItems = JSON.parseObject(jsonData, new TypeReference>() {}); - for (MediaItem item : mediaItems) { + List onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference>() {}); + for (OnStreamChangedHookParam item : onStreamChangedHookParams) { // 不保存国标推理以及拉流代理的流 if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() @@ -112,7 +113,7 @@ public class StreamPushServiceImpl implements IStreamPushService { return new ArrayList<>(result.values()); } @Override - public StreamPushItem transform(MediaItem item) { + public StreamPushItem transform(OnStreamChangedHookParam item) { StreamPushItem streamPushItem = new StreamPushItem(); streamPushItem.setApp(item.getApp()); streamPushItem.setMediaServerId(item.getMediaServerId()); @@ -206,8 +207,8 @@ public class StreamPushServiceImpl implements IStreamPushService { List pushList = getPushList(mediaServerId); Map pushItemMap = new HashMap<>(); // redis记录 - List mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH"); - Map streamInfoPushItemMap = new HashMap<>(); + List onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + Map streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { for (StreamPushItem streamPushItem : pushList) { if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { @@ -215,9 +216,9 @@ public class StreamPushServiceImpl implements IStreamPushService { } } } - if (mediaItems.size() > 0) { - for (MediaItem mediaItem : mediaItems) { - streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem); + if (onStreamChangedHookParams.size() > 0) { + for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { + streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); } } zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ @@ -258,19 +259,19 @@ public class StreamPushServiceImpl implements IStreamPushService { } } - Collection offlineMediaItemList = streamInfoPushItemMap.values(); - if (offlineMediaItemList.size() > 0) { + Collection offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); + if (offlineOnStreamChangedHookParamList.size() > 0) { String type = "PUSH"; - for (MediaItem offlineMediaItem : offlineMediaItemList) { + for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", offlineMediaItem.getApp()); - jsonObject.put("stream", offlineMediaItem.getStream()); + jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); + jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream()); + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); } } })); @@ -288,15 +289,15 @@ public class StreamPushServiceImpl implements IStreamPushService { // 发送流停止消息 String type = "PUSH"; // 发送redis消息 - List streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); + List streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); if (streamInfoList.size() > 0) { - for (MediaItem mediaItem : streamInfoList) { + for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream()); + redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", mediaItem.getApp()); - jsonObject.put("stream", mediaItem.getStream()); + jsonObject.put("app", onStreamChangedHookParam.getApp()); + jsonObject.put("stream", onStreamChangedHookParam.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java index d1730275..3e73fc05 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java @@ -5,7 +5,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -66,20 +66,20 @@ public class RedisStreamMsgListener implements MessageListener { String stream = steamMsgJson.getString("stream"); boolean register = steamMsgJson.getBoolean("register"); String mediaServerId = steamMsgJson.getString("mediaServerId"); - MediaItem mediaItem = new MediaItem(); - mediaItem.setSeverId(serverId); - mediaItem.setApp(app); - mediaItem.setStream(stream); - mediaItem.setRegist(register); - mediaItem.setMediaServerId(mediaServerId); - mediaItem.setCreateStamp(System.currentTimeMillis()/1000); - mediaItem.setAliveSecond(0L); - mediaItem.setTotalReaderCount("0"); - mediaItem.setOriginType(0); - mediaItem.setOriginTypeStr("0"); - mediaItem.setOriginTypeStr("unknown"); + OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); + onStreamChangedHookParam.setSeverId(serverId); + onStreamChangedHookParam.setApp(app); + onStreamChangedHookParam.setStream(stream); + onStreamChangedHookParam.setRegist(register); + onStreamChangedHookParam.setMediaServerId(mediaServerId); + onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); + onStreamChangedHookParam.setAliveSecond(0L); + onStreamChangedHookParam.setTotalReaderCount("0"); + onStreamChangedHookParam.setOriginType(0); + onStreamChangedHookParam.setOriginTypeStr("0"); + onStreamChangedHookParam.setOriginTypeStr("unknown"); if (register) { - zlmMediaListManager.addPush(mediaItem); + zlmMediaListManager.addPush(onStreamChangedHookParam); }else { zlmMediaListManager.removeMedia(app, stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 81f36916..f9a223c7 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -5,9 +5,9 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; @@ -131,7 +131,7 @@ public interface IRedisCatchStorage { * @param app * @param streamId */ - void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, MediaItem item); + void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam item); /** * 移除流信息从redis @@ -165,7 +165,7 @@ public interface IRedisCatchStorage { */ ThirdPartyGB queryMemberNoGBId(String queryKey); - List getStreams(String mediaServerId, String pull); + List getStreams(String mediaServerId, String pull); /** * 将device信息写入redis @@ -191,7 +191,7 @@ public interface IRedisCatchStorage { void resetAllSN(); - MediaItem getStreamInfo(String app, String streamId, String mediaServerId); + OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId); void addCpuInfo(double cpuInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 8fcf373a..ca2e348c 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; @@ -573,14 +573,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, MediaItem mediaItem) { + public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam onStreamChangedHookParam) { // 查找是否使用了callID StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); if (streamAuthorityInfo != null) { - mediaItem.setCallId(streamAuthorityInfo.getCallId()); + onStreamChangedHookParam.setCallId(streamAuthorityInfo.getCallId()); } - RedisUtil.set(key, mediaItem); + RedisUtil.set(key, onStreamChangedHookParam); } @Override @@ -638,13 +638,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public List getStreams(String mediaServerId, String type) { - List result = new ArrayList<>(); + public List getStreams(String mediaServerId, String type) { + List result = new ArrayList<>(); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId; List streams = RedisUtil.scan(key); for (Object stream : streams) { - MediaItem mediaItem = (MediaItem)RedisUtil.get((String) stream); - result.add(mediaItem); + OnStreamChangedHookParam onStreamChangedHookParam = (OnStreamChangedHookParam)RedisUtil.get((String) stream); + result.add(onStreamChangedHookParam); } return result; } @@ -716,14 +716,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override - public MediaItem getStreamInfo(String app, String streamId, String mediaServerId) { + public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) { String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId; - MediaItem result = null; + OnStreamChangedHookParam result = null; List keys = RedisUtil.scan(scanKey); if (keys.size() > 0) { String key = (String) keys.get(0); - result = (MediaItem)RedisUtil.get(key); + result = (OnStreamChangedHookParam)RedisUtil.get(key); } return result; diff --git a/web_src/src/components/console/ConsoleResource.vue b/web_src/src/components/console/ConsoleResource.vue index c76b270d..64c1c49e 100644 --- a/web_src/src/components/console/ConsoleResource.vue +++ b/web_src/src/components/console/ConsoleResource.vue @@ -1,28 +1,32 @@