diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index be73ebd9..89ecb186 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -1,14 +1,9 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,12 +11,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * catalog事件 @@ -43,6 +40,9 @@ public class CatalogEventLister implements ApplicationListener { @Autowired private SubscribeHolder subscribeHolder; + @Autowired + private UserSetting userSetting; + @Override public void onApplicationEvent(CatalogEvent event) { SubscribeInfo subscribe = null; @@ -93,6 +93,9 @@ public class CatalogEventLister implements ApplicationListener { } if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ for (GbStream gbStream : event.getGbStreams()) { + if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) { + continue; + } DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform); deviceChannelList.add(deviceChannelByStream); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index f9f8fc24..b9a41a57 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -118,6 +118,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 离线 logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); @@ -126,14 +128,14 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } - }else { - logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); } break; case CatalogEvent.VLOST: // 视频丢失 logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); @@ -142,14 +144,14 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } - }else { - logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); } break; case CatalogEvent.DEFECT: // 故障 logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { executeSaveForOffline(); @@ -158,8 +160,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } - }else { - logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); } break; case CatalogEvent.ADD: diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 523b10fa..405fdd00 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -292,22 +293,24 @@ public class ZLMHttpHookListener { JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (mediaInfo == null) { + logger.info("[ZLM HOOK] 流变化未找到ZLM, {}", param.getMediaServerId()); + return; + } if (subscribe != null) { - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); - if (mediaInfo != null) { - subscribe.response(mediaInfo, json); - } + subscribe.response(mediaInfo, json); } List tracks = param.getTracks(); // TODO 重构此处逻辑 - + boolean isPush = false; if (param.isRegist()) { // 处理流注册的鉴权信息 if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - + isPush = true; StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); if (streamAuthorityInfo == null) { streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); @@ -329,7 +332,10 @@ public class ZLMHttpHookListener { mediaServerService.removeCount(param.getMediaServerId()); } // 设置拉流代理上线/离线 - streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + if (updateStatusResult > 0) { + + } if ("rtp".equals(param.getApp()) && !param.isRegist()) { StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(param.getStream()); @@ -337,7 +343,8 @@ public class ZLMHttpHookListener { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); } else { - streamInfo = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); + streamInfo = redisCatchStorage.queryPlayback(null, null, + param.getStream(), null); if (streamInfo != null) { redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream(), null); @@ -346,48 +353,50 @@ public class ZLMHttpHookListener { } else { if (!"rtp".equals(param.getApp())) { String type = OriginType.values()[param.getOriginType()].getType(); - MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); - - if (mediaServerItem != null) { - if (param.isRegist()) { - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); - String callId = null; - if (streamAuthorityInfo != null) { - callId = streamAuthorityInfo.getCallId(); - } - StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, - param.getApp(), param.getStream(), tracks, callId); - param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); - redisCatchStorage.addStream(mediaServerItem, type, param.getApp(), param.getStream(), param); - if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - param.setSeverId(userSetting.getServerId()); - zlmMediaListManager.addPush(param); - } - } else { - // 兼容流注销时类型从redis记录获取 - OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(param.getApp(), param.getStream(), param.getMediaServerId()); - if (onStreamChangedHookParam != null) { - type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); - redisCatchStorage.removeStream(mediaServerItem.getId(), type, param.getApp(), param.getStream()); - } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { + if (param.isRegist()) { + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo( + param.getApp(), param.getStream()); + String callId = null; + if (streamAuthorityInfo != null) { + callId = streamAuthorityInfo.getCallId(); + } + StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo, + param.getApp(), param.getStream(), tracks, callId); + param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); + redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param); + if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { + param.setSeverId(userSetting.getServerId()); + zlmMediaListManager.addPush(param); + } + } else { + // 兼容流注销时类型从redis记录获取 + OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( + param.getApp(), param.getStream(), param.getMediaServerId()); + if (onStreamChangedHookParam != null) { + type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); - } - if (type != null) { - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", param.getApp()); - jsonObject.put("stream", param.getStream()); - jsonObject.put("register", param.isRegist()); - jsonObject.put("mediaServerId", param.getMediaServerId()); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } + zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { + eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF); + } + if (type != null) { + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", param.getApp()); + jsonObject.put("stream", param.getStream()); + jsonObject.put("register", param.isRegist()); + jsonObject.put("mediaServerId", param.getMediaServerId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } } @@ -403,7 +412,8 @@ public class ZLMHttpHookListener { try { if (platform != null) { commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStreamId()); } else { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); } @@ -428,7 +438,8 @@ public class ZLMHttpHookListener { @PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8") public JSONObject onStreamNoneReader(@RequestBody OnStreamNoneReaderHookParam param) { - logger.info("[ZLM HOOK]流无人观看:{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + logger.info("[ZLM HOOK]流无人观看:{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), + param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); ret.put("code", 0); // 国标类型的流 @@ -440,7 +451,8 @@ public class ZLMHttpHookListener { if (streamInfoForPlayCatch != null) { // 收到无人观看说明流也没有在往上级推送 if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(streamInfoForPlayCatch.getChannelId()); + List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + streamInfoForPlayCatch.getChannelId()); if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); @@ -470,7 +482,8 @@ public class ZLMHttpHookListener { return ret; } // 录像回放 - StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); + StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, + param.getStream(), null); if (streamInfoForPlayBackCatch != null) { if (streamInfoForPlayBackCatch.isPause()) { ret.put("close", false); @@ -491,7 +504,8 @@ public class ZLMHttpHookListener { return ret; } // 录像下载 - StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, param.getStream(), null); + StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, + param.getStream(), null); // 进行录像下载时无人观看不断流 if (streamInfoForDownload != null) { ret.put("close", false); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java index a2da561d..b327f13a 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java @@ -18,6 +18,10 @@ public class HookResult { return new HookResult(0, "success"); } + public static HookResult Fail(){ + return new HookResult(-1, "fail"); + } + public int getCode() { return code; } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index facd54ca..4c681e88 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.*; @@ -43,6 +44,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Autowired private RedisTemplate redisTemplate; + @Autowired + private StringRedisTemplate stringRedisTemplate; + @Override public Long getCSEQ() { String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId(); @@ -913,7 +917,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { msg.append(":").append(channelId); } msg.append(" ").append(online? "ON":"OFF"); - - redisTemplate.convertAndSend(key, msg.toString()); + // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 + stringRedisTemplate.convertAndSend(key, msg.toString()); } }