diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index f3d542c8..2f62287e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -1,9 +1,7 @@ package com.genersoft.iot.vmp.gb28181; import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; import org.slf4j.Logger; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index fd0cfdc9..9495e9de 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent; import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent; import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent; import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent; +import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; @@ -73,5 +74,10 @@ public class EventPublisher { outEvent.setMediaServerId(mediaServerId); applicationEventPublisher.publishEvent(outEvent); } - + + public void zlmOnlineEventPublish(String mediaServerId) { + ZLMOnlineEvent outEvent = new ZLMOnlineEvent(this); + outEvent.setMediaServerId(mediaServerId); + applicationEventPublisher.publishEvent(outEvent); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 70655585..8c1239e8 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -179,29 +179,33 @@ public class ZLMHttpHookListener { public ResponseEntity onPublish(@RequestBody JSONObject json) { logger.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString()); - + JSONObject ret = new JSONObject(); + ret.put("code", 0); + ret.put("msg", "success"); + ret.put("enableHls", true); + ret.put("enableMP4", userSetup.isRecordPushLive()); String mediaServerId = json.getString("mediaServerId"); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json); if (subscribe != null) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { subscribe.response(mediaInfo, json); + }else { + ret.put("code", 1); + ret.put("msg", "zlm not register"); } } String app = json.getString("app"); String stream = json.getString("stream"); StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream); - JSONObject ret = new JSONObject(); + // 录像回放时不进行录像下载 if (streamInfo != null) { ret.put("enableMP4", false); }else { ret.put("enableMP4", userSetup.isRecordPushLive()); } - ret.put("code", 0); - ret.put("msg", "success"); - ret.put("enableHls", true); - ret.put("enableMP4", userSetup.isRecordPushLive()); + return new ResponseEntity(ret.toString(), HttpStatus.OK); } @@ -340,37 +344,38 @@ public class ZLMHttpHookListener { if (!"rtp".equals(app)){ String type = OriginType.values()[item.getOriginType()].getType(); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); - if (regist) { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); - redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo); - if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - zlmMediaListManager.addPush(item); - } - }else { - // 兼容流注销时类型错误的问题,等zlm更新后删除 - StreamPushItem streamPushItem = streamPushService.getPush(app, streamId); - if (streamPushItem != null) { - type = "PUSH"; - }else { - StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId); - if (streamProxyByAppAndStream != null) { - type = "PULL"; + if (mediaServerItem != null){ + if (regist) { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); + redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo); + if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { + zlmMediaListManager.addPush(item); } + }else { + // 兼容流注销时类型错误的问题,等zlm更新后删除 + StreamPushItem streamPushItem = streamPushService.getPush(app, streamId); + if (streamPushItem != null) { + type = "PUSH"; + }else { + StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId); + if (streamProxyByAppAndStream != null) { + type = "PULL"; + } + } + zlmMediaListManager.removeMedia(app, streamId); + redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId); } - zlmMediaListManager.removeMedia(app, streamId); - redisCatchStorage.removeStream(mediaServerItem, type, app, streamId); + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", app); + jsonObject.put("stream", streamId); + jsonObject.put("register", regist); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } - - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", app); - jsonObject.put("stream", streamId); - jsonObject.put("register", regist); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index f185d824..5b7ba1cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -141,7 +141,6 @@ public class ZLMMediaListManager { }else { gbStreamMapper.add(transform); } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index 5555617f..4315c8d2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.MediaConfig; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; @@ -17,6 +18,7 @@ import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import java.util.*; @@ -37,6 +39,9 @@ public class ZLMRunner implements CommandLineRunner { @Autowired private IStreamProxyService streamProxyService; + @Autowired + private EventPublisher publisher; + @Autowired private IMediaServerService mediaServerService; @@ -117,7 +122,7 @@ public class ZLMRunner implements CommandLineRunner { @Async public void connectZlmServer(MediaServerItem mediaServerItem){ - ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem); + ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem, 1); if (zlmServerConfig != null) { zlmServerConfig.setIp(mediaServerItem.getIp()); zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort()); @@ -126,7 +131,7 @@ public class ZLMRunner implements CommandLineRunner { } } - public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem) { + public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem, int index) { if (startGetMedia == null) { return null;} if (!mediaServerItem.isDefaultServer() && mediaServerService.getOne(mediaServerItem.getId()) == null) { return null; @@ -143,14 +148,19 @@ public class ZLMRunner implements CommandLineRunner { ZLMServerConfig.setIp(mediaServerItem.getIp()); } } else { - logger.error("[ {} ]-[ {}:{} ]主动连接失败失败, 2s后重试", - mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + logger.error("[ {} ]-[ {}:{} ]第{}次主动连接失败, 2s后重试", + mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort(), index); + if (index == 1 && !StringUtils.isEmpty(mediaServerItem.getId())) { + logger.info("[ {} ]-[ {}:{} ]第{}次主动连接失败, 开始清理相关资源", + mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort(), index); + publisher.zlmOfflineEventPublish(mediaServerItem.getId()); + } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } - ZLMServerConfig = getMediaServerConfig(mediaServerItem); + ZLMServerConfig = getMediaServerConfig(mediaServerItem, index += 1); } return ZLMServerConfig; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java new file mode 100644 index 00000000..33a251c9 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java @@ -0,0 +1,52 @@ +package com.genersoft.iot.vmp.media.zlm.event; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +/** + * @description:设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件 + * @author: swwheihei + * @date: 2020年5月6日 上午11:35:46 + */ +@Component +public class ZLMKeepliveTimeoutListener extends KeyExpirationEventMessageListener { + + private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class); + + @Autowired + private EventPublisher publisher; + + @Autowired + private UserSetup userSetup; + + public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + /** + * 监听失效的key,key格式为keeplive_deviceId + * @param message + * @param pattern + */ + @Override + public void onMessage(Message message, byte[] pattern) { + // 获取失效的key + String expiredKey = message.toString(); + String KEEPLIVEKEY_PREFIX = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_"; + if(!expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){ + return; + } + + String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); + + publisher.zlmOfflineEventPublish(mediaServerId); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java new file mode 100644 index 00000000..8207bdd3 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java @@ -0,0 +1,11 @@ +package com.genersoft.iot.vmp.media.zlm.event; + +/** + * zlm离线事件类 + */ +public class ZLMOfflineEvent extends ZLMEventAbstract { + + public ZLMOfflineEvent(Object source) { + super(source); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java new file mode 100644 index 00000000..b7135527 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java @@ -0,0 +1,44 @@ +package com.genersoft.iot.vmp.media.zlm.event; + +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamProxyService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +/** + * + */ +@Component +public class ZLMOfflineEventListener implements ApplicationListener { + + private final static Logger logger = LoggerFactory.getLogger(ZLMOfflineEventListener.class); + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IStreamPushService streamPushService; + + @Autowired + private IStreamProxyService streamProxyService; + + @Override + public void onApplicationEvent(ZLMOfflineEvent event) { + + if (logger.isDebugEnabled()) { + logger.debug("ZLM离线事件触发,ID:" + event.getMediaServerId()); + } + // 处理ZLM离线 + mediaServerService.zlmServerOffline(event.getMediaServerId()); + streamProxyService.zlmServerOffline(event.getMediaServerId()); + streamPushService.zlmServerOffline(event.getMediaServerId()); + // TODO 处理对国标的影响 + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java new file mode 100644 index 00000000..612ff9d0 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java @@ -0,0 +1,11 @@ +package com.genersoft.iot.vmp.media.zlm.event; + +/** + * zlm在线事件 + */ +public class ZLMOnlineEvent extends ZLMEventAbstract { + + public ZLMOnlineEvent(Object source) { + super(source); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java new file mode 100644 index 00000000..5731ea02 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java @@ -0,0 +1,65 @@ +package com.genersoft.iot.vmp.media.zlm.event; + +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamProxyService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import java.text.SimpleDateFormat; + +/** + * @description: 在线事件监听器,监听到离线后,修改设备离在线状态。 设备在线有两个来源: + * 1、设备主动注销,发送注销指令 + * 2、设备未知原因离线,心跳超时 + * @author: swwheihei + * @date: 2020年5月6日 下午1:51:23 + */ +@Component +public class ZLMOnlineEventListener implements ApplicationListener { + + private final static Logger logger = LoggerFactory.getLogger(ZLMOnlineEventListener.class); + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private RedisUtil redis; + + @Autowired + private SipConfig sipConfig; + + @Autowired + private UserSetup userSetup; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IStreamPushService streamPushService; + + @Autowired + private IStreamProxyService streamProxyService; + + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + @Override + public void onApplicationEvent(ZLMOnlineEvent event) { + + if (logger.isDebugEnabled()) { + logger.debug("ZLM上线事件触发,ID:" + event.getMediaServerId()); + } + streamPushService.zlmServerOnline(event.getMediaServerId()); + streamProxyService.zlmServerOnline(event.getMediaServerId()); + + + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 618b8241..40b2c9a6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -78,10 +78,10 @@ public interface IStreamProxyService { /** * 新的节点加入 - * @param zlmServerConfig + * @param mediaServerId * @return */ - void zlmServerOnline(ZLMServerConfig zlmServerConfig); + void zlmServerOnline(String mediaServerId); /** * 节点离线 @@ -89,4 +89,6 @@ public interface IStreamProxyService { * @return */ void zlmServerOffline(String mediaServerId); + + void clean(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index d8a4465f..d228a1a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -34,6 +34,7 @@ public interface IStreamPushService { * @return */ PageInfo getPushList(Integer page, Integer count); + List getPushList(String mediaSererId); StreamPushItem transform(MediaItem item); @@ -49,10 +50,10 @@ public interface IStreamPushService { /** * 新的节点加入 - * @param zlmServerConfig + * @param mediaServerId * @return */ - void zlmServerOnline(ZLMServerConfig zlmServerConfig); + void zlmServerOnline(String mediaServerId); /** * 节点离线 @@ -61,4 +62,5 @@ public interface IStreamPushService { */ void zlmServerOffline(String mediaServerId); + void clean(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index e02bd3f9..7d9f7480 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -4,10 +4,10 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; @@ -70,6 +70,9 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @Autowired private RedisUtil redisUtil; + @Autowired + private EventPublisher publisher; + @Autowired JedisUtil jedisUtil; @@ -312,8 +315,6 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR return mediaServerMapper.update(mediaSerItem); } - - /** * 处理zlm上线 * @param zlmServerConfig zlm上线携带的参数 @@ -353,28 +354,31 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR if (serverItem.getRtpProxyPort() == 0) { serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); } - if (StringUtils.isEmpty(serverItem.getId())) { - serverItem.setId(zlmServerConfig.getGeneralMediaServerId()); - } serverItem.setStatus(true); + if (StringUtils.isEmpty(serverItem.getId())) { serverItem.setId(zlmServerConfig.getGeneralMediaServerId()); mediaServerMapper.updateByHostAndPort(serverItem); }else { mediaServerMapper.update(serverItem); } - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId(); + String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId(); if (redisUtil.get(key) == null) { - SsrcConfig ssrcConfig = new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain()); + SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipConfig.getDomain()); serverItem.setSsrcConfig(ssrcConfig); - redisUtil.set(key, serverItem); + }else { + MediaServerItem mediaServerItemInRedis = (MediaServerItem)redisUtil.get(key); + serverItem.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig()); } - + redisUtil.set(key, serverItem); resetOnlineServerItem(serverItem); updateMediaServerKeepalive(serverItem.getId(), null); setZLMConfig(serverItem); + publisher.zlmOnlineEventPublish(serverItem.getId()); + } + @Override public void zlmServerOffline(String mediaServerId) { delete(mediaServerId); @@ -567,6 +571,10 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @Override public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) { MediaServerItem mediaServerItem = getOne(mediaServerId); + if (mediaServerItem == null) { + logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息"); + return; + } String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_" + mediaServerId; int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2; redisUtil.set(key, data, hookAliveInterval); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 907893d1..19bf13a3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; @@ -28,8 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; -import java.util.ArrayList; -import java.util.List; +import java.util.*; /** * 视频代理业务 @@ -54,6 +54,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private UserSetup userSetup; + @Autowired private GbStreamMapper gbStreamMapper; @@ -160,6 +163,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }else { mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); } + if (mediaServerItem == null) { + return null; + } if ("default".equals(param.getType())){ result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(), param.isEnable_hls(), param.isEnable_mp4(), param.getRtp_type()); @@ -244,7 +250,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } } - return result; } @@ -255,18 +260,41 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override - public void zlmServerOnline(ZLMServerConfig zlmServerConfig) { - + public void zlmServerOnline(String mediaServerId) { + zlmServerOffline(mediaServerId); } @Override public void zlmServerOffline(String mediaServerId) { // 移除开启了无人观看自动移除的流 + List streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId); + if (streamProxyItemList.size() > 0) { + gbStreamMapper.batchDel(streamProxyItemList); + } streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); // 其他的流设置未启用 streamProxyMapper.updateStatus(false, mediaServerId); - // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerId, "PULL"); + String type = "PULL"; + + // 发送redis消息 + List streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); + if (streamInfoList.size() > 0) { + for (StreamInfo streamInfo : streamInfoList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", streamInfo.getApp()); + jsonObject.put("stream", streamInfo.getStreamId()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 移除redis内流的信息 + redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); + } + } + } + + @Override + public void clean() { } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 7c17c2a1..dcca0e5b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -3,11 +3,15 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.OriginType; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; @@ -20,10 +24,7 @@ import com.github.pagehelper.PageInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; @Service public class StreamPushServiceImpl implements IStreamPushService { @@ -43,6 +44,9 @@ public class StreamPushServiceImpl implements IStreamPushService { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private UserSetup userSetup; + @Autowired private IMediaServerService mediaServerService; @@ -56,7 +60,9 @@ public class StreamPushServiceImpl implements IStreamPushService { for (MediaItem item : mediaItems) { // 不保存国标推理以及拉流代理的流 - if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) { + if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { String key = item.getApp() + "_" + item.getStream(); StreamPushItem streamPushItem = result.get(key); if (streamPushItem == null) { @@ -97,6 +103,11 @@ public class StreamPushServiceImpl implements IStreamPushService { return new PageInfo<>(all); } + @Override + public List getPushList(String mediaServerId) { + return streamPushMapper.selectAllByMediaServerId(mediaServerId); + } + @Override public boolean saveToGB(GbStream stream) { stream.setStreamType("push"); @@ -137,17 +148,84 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public void zlmServerOnline(ZLMServerConfig zlmServerConfig) { - // 似乎没啥需要做的 + public void zlmServerOnline(String mediaServerId) { + // 同步zlm推流信息 + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + return; + } + List pushList = getPushList(mediaServerId); + if (pushList.size() > 0) { + Map pushItemMap = new HashMap<>(); + for (StreamPushItem streamPushItem : pushList) { + pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); + } + zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ + if (mediaList == null) return; + String dataStr = mediaList.getString("data"); + + Integer code = mediaList.getInteger("code"); + List streamPushItems = null; + if (code == 0 ) { + if (dataStr != null) { + streamPushItems = handleJSON(dataStr, mediaServerItem); + } + } + + if (streamPushItems != null) { + for (StreamPushItem streamPushItem : streamPushItems) { + pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); + } + } + Collection offlinePushItems = pushItemMap.values(); + if (offlinePushItems.size() > 0) { + String type = "PUSH"; + streamPushMapper.delAll(new ArrayList<>(offlinePushItems)); + for (StreamPushItem offlinePushItem : offlinePushItems) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", offlinePushItem.getApp()); + jsonObject.put("stream", offlinePushItem.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 移除redis内流的信息 + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream()); + } + } + })); + } } @Override public void zlmServerOffline(String mediaServerId) { - // 移除没有serverId的推流 + List streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId); + // 移除没有GBId的推流 streamPushMapper.deleteWithoutGBId(mediaServerId); + gbStreamMapper.deleteWithoutGBId("push", mediaServerId); // 其他的流设置未启用 gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false); - // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerId, "PUSH"); + // 发送流停止消息 + String type = "PUSH"; + // 发送redis消息 + List streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); + if (streamInfoList.size() > 0) { + for (StreamInfo streamInfo : streamInfoList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", streamInfo.getApp()); + jsonObject.put("stream", streamInfo.getStreamId()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + // 移除redis内流的信息 + redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); + } + } + } + + @Override + public void clean() { + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 0803dd64..4f240d86 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -140,11 +140,11 @@ public interface IRedisCatchStorage { /** * 移除流信息从redis - * @param mediaServerItem + * @param mediaServerId * @param app * @param streamId */ - void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId); + void removeStream(String mediaServerId, String type, String app, String streamId); /** @@ -167,4 +167,6 @@ public interface IRedisCatchStorage { * @return */ ThirdPartyGB queryMemberNoGBId(String queryKey); + + List getStreams(String mediaServerId, String pull); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index f1599255..9757b13d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -65,4 +65,18 @@ public interface GbStreamMapper { "SET status=${status} " + "WHERE mediaServerId=#{mediaServerId} ") void updateStatusByMediaServerId(String mediaServerId, boolean status); + + @Select("SELECT * FROM gb_stream WHERE mediaServerId=#{mediaServerId}") + void delByMediaServerId(String mediaServerId); + + @Delete("DELETE FROM gb_stream WHERE streamType=#{type} AND gbId=NULL AND mediaServerId=#{mediaServerId}") + void deleteWithoutGBId(String type, String mediaServerId); + + @Delete("") + void batchDel(List streamProxyItemList); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 82520ec4..b6e1ba1a 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -62,6 +62,9 @@ public interface StreamProxyMapper { "WHERE mediaServerId=#{mediaServerId}") void updateStatus(boolean status, String mediaServerId); - @Delete("DELETE FROM stream_proxy WHERE mediaServerId=#{mediaServerId}") + @Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}") void deleteAutoRemoveItemByMediaServerId(String mediaServerId); + + @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable_remove_none_reader=true AND st.mediaServerId=#{mediaServerId} order by st.createTime desc") + List selecAutoRemoveItemByMediaServerId(String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index 9fe6ebf2..c5b22f8b 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; +import java.util.Collection; import java.util.List; @Mapper @@ -31,6 +32,14 @@ public interface StreamPushMapper { @Delete("DELETE FROM stream_push WHERE app=#{app} AND stream=#{stream}") int del(String app, String stream); + @Delete("") + int delAll(List streamPushItems); + @Select("SELECT st.*, pgs.gbId, pgs.status, pgs.name, pgs.longitude, pgs.latitude FROM stream_push st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream") List selectAll(); @@ -56,4 +65,7 @@ public interface StreamPushMapper { @Delete("DELETE FROM stream_push WHERE mediaServerId=#{mediaServerId}") void deleteWithoutGBId(String mediaServerId); + @Select("SELECT * FROM stream_push WHERE mediaServerId=#{mediaServerId}") + List selectAllByMediaServerId(String mediaServerId); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 6adc05df..af9a206c 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -338,8 +338,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId) { - String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); + public void removeStream(String mediaServerId, String type, String app, String streamId) { + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerId; redis.del(key); } @@ -365,4 +365,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { redis.del((String) stream); } } + + @Override + public List getStreams(String mediaServerId, String type) { + List result = new ArrayList<>(); + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId; + List streams = redis.scan(key); + for (Object stream : streams) { + StreamInfo streamInfo = (StreamInfo)redis.get((String) stream); + result.add(streamInfo); + } + return result; + } }