diff --git a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java index 93739685..4217e140 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java @@ -57,6 +57,7 @@ public class MediaServerConfig implements CommandLineRunner { @Override public void run(String... strings) throws Exception { + // TODO 获取所有的离线节点信息 mediaServerService.clearMediaServerForOnline(); MediaServerItem defaultMediaServer = mediaServerService.getDefaultMediaServer(); if (defaultMediaServer == null) { @@ -67,7 +68,11 @@ public class MediaServerConfig implements CommandLineRunner { } // 发送媒体节点变化事件 mediaServerService.syncCatchFromDatabase(); + // 获取所有的zlm, 并开启主动连接 + List all = mediaServerService.getAllFromDatabase(); + MediaServerChangeEvent event = new MediaServerChangeEvent(this); + event.setMediaServerItem(); applicationEventPublisher.publishEvent(event); // TODO 此处以下代码弃用 diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 2c421bad..aa3efceb 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -264,6 +264,9 @@ public class MediaServerServiceImpl implements IMediaServerService { } String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId(); redisTemplate.opsForValue().set(key, mediaServerItemInDataBase); + if (mediaSerItem.isStatus()) { + resetOnlineServerItem(mediaSerItem); + } } @Override @@ -498,20 +501,18 @@ public class MediaServerServiceImpl implements IMediaServerService { if (redisTemplate.opsForZSet().score(key, serverItem.getId()) == null) { // 不存在则设置默认值 已存在则重置 redisTemplate.opsForZSet().add(key, serverItem.getId(), 0L); // 查询服务流数量 - zlmresTfulUtils.getMediaList(serverItem, null, null, "rtsp",(mediaList ->{ - Integer code = mediaList.getInteger("code"); - if (code == 0) { - JSONArray data = mediaList.getJSONArray("data"); - if (data != null) { - redisTemplate.opsForZSet().add(key, serverItem.getId(), data.size()); - } - } - })); + int count = getMediaList(serverItem); + redisTemplate.opsForZSet().add(key, serverItem.getId(), count); }else { clearRTPServer(serverItem); } } + private int getMediaList(MediaServerItem serverItem) { + + return 0; + } + @Override public void addCount(String mediaServerId) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java index bf396900..ef0c67a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java @@ -1,13 +1,23 @@ package com.genersoft.iot.vmp.media.zlm; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; +import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; +import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -15,11 +25,23 @@ import java.util.concurrent.ConcurrentHashMap; /** * 管理zlm流媒体节点的状态 */ +@Component public class ZLMMediaServerStatusManger { private final static Logger logger = LoggerFactory.getLogger(ZLMMediaServerStatusManger.class); - private Map offlineZlmMap = new ConcurrentHashMap<>(); + private final Map offlineZlmPrimaryMap = new ConcurrentHashMap<>(); + private final Map offlineZlmsecondaryMap = new ConcurrentHashMap<>(); + private final Map offlineZlmTimeMap = new ConcurrentHashMap<>(); + + @Autowired + private ZLMRESTfulUtils zlmresTfulUtils; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private DynamicTask dynamicTask; private final String type = "zlm"; @@ -31,8 +53,9 @@ public class ZLMMediaServerStatusManger { || event.getMediaServerItem().isStatus()) { return; } - logger.info("[ZLM 待上线节点变化] ID:" + event.getMediaServerItem().getId()); - offlineZlmMap.put(event.getMediaServerItem().getId(), event.getMediaServerItem()); + logger.info("[ZLM-添加待上线节点] ID:" + event.getMediaServerItem().getId()); + offlineZlmPrimaryMap.put(event.getMediaServerItem().getId(), event.getMediaServerItem()); + offlineZlmTimeMap.put(event.getMediaServerItem().getId(), System.currentTimeMillis()); } @Async("taskExecutor") @@ -43,13 +66,124 @@ public class ZLMMediaServerStatusManger { || event.getMediaServerItem().isStatus()) { return; } + MediaServerItem serverItem = mediaServerService.getOne(event.getMediaServerItem().getId()); + if (serverItem == null) { + return; + } logger.info("[ZLM-HOOK事件-服务启动] ID:" + event.getMediaServerItem().getId()); - offlineZlmMap.remove(event.getMediaServerItem().getId()); + online(serverItem); } - @Scheduled(fixedDelay = ) //每天的0点执行 - public void execute(){ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(HookZlmServerKeepaliveEvent event) { + if (event.getMediaServerItem() == null + || !type.equals(event.getMediaServerItem().getType()) + || event.getMediaServerItem().isStatus()) { + return; + } + MediaServerItem serverItem = mediaServerService.getOne(event.getMediaServerItem().getId()); + if (serverItem == null) { + return; + } + logger.info("[ZLM-HOOK事件-心跳] ID:" + event.getMediaServerItem().getId()); + online(serverItem); + } + @Scheduled(fixedDelay = 10*1000) //每隔10秒检查一次 + public void execute(){ + // 初次加入的离线节点会在30分钟内,每间隔十秒尝试一次,30分钟后如果仍然没有上线,则每隔30分钟尝试一次连接 + if (offlineZlmPrimaryMap.isEmpty() && offlineZlmsecondaryMap.isEmpty()) { + return; + } + if (!offlineZlmPrimaryMap.isEmpty()) { + for (MediaServerItem mediaServerItem : offlineZlmPrimaryMap.values()) { + if (offlineZlmTimeMap.get(mediaServerItem.getId()) > 30*60*1000) { + offlineZlmsecondaryMap.put(mediaServerItem.getId(), mediaServerItem); + offlineZlmPrimaryMap.remove(mediaServerItem.getId()); + continue; + } + logger.info("[ZLM-尝试连接] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + JSONObject responseJson = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); + ZLMServerConfig zlmServerConfig = null; + if (responseJson == null) { + logger.info("[ZLM-尝试连接]失败, ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + continue; + } + JSONArray data = responseJson.getJSONArray("data"); + if (data == null || data.isEmpty()) { + logger.info("[ZLM-尝试连接]失败, ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + }else { + zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); + initPort(mediaServerItem, zlmServerConfig); + online(mediaServerItem); + } + } + } + if (!offlineZlmsecondaryMap.isEmpty()) { + for (MediaServerItem mediaServerItem : offlineZlmsecondaryMap.values()) { + if (offlineZlmTimeMap.get(mediaServerItem.getId()) < 30*60*1000) { + continue; + } + logger.info("[ZLM-尝试连接] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + JSONObject responseJson = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); + ZLMServerConfig zlmServerConfig = null; + if (responseJson == null) { + logger.info("[ZLM-尝试连接]失败, ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); + continue; + } + JSONArray data = responseJson.getJSONArray("data"); + if (data == null || data.isEmpty()) { + logger.info("[ZLM-尝试连接]失败, ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); + }else { + zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); + initPort(mediaServerItem, zlmServerConfig); + online(mediaServerItem); + } + } + } + } + + private void online(MediaServerItem mediaServerItem) { + logger.info("[ZLM-连接成功] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + offlineZlmPrimaryMap.remove(mediaServerItem.getId()); + offlineZlmsecondaryMap.remove(mediaServerItem.getId()); + offlineZlmTimeMap.remove(mediaServerItem.getId()); + if (!mediaServerItem.isStatus()) { + mediaServerItem.setStatus(true); + mediaServerService.update(mediaServerItem); + } + // 设置两次心跳未收到则认为zlm离线 + String key = "zlm-keepalive-" + mediaServerItem.getId(); + dynamicTask.startDelay(key, ()->{ + logger.warn("[ZLM-心跳超时] ID:{}", mediaServerItem.getId()); + mediaServerItem.setStatus(false); + offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem); + offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); + }, (int)(mediaServerItem.getHookAliveInterval() * 2 * 1000)); + } + private void initPort(MediaServerItem mediaServerItem, ZLMServerConfig zlmServerConfig) { + if (mediaServerItem.getHttpSSlPort() == 0) { + mediaServerItem.setHttpSSlPort(zlmServerConfig.getHttpSSLport()); + } + if (mediaServerItem.getRtmpPort() == 0) { + mediaServerItem.setRtmpPort(zlmServerConfig.getRtmpPort()); + } + if (mediaServerItem.getRtmpSSlPort() == 0) { + mediaServerItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort()); + } + if (mediaServerItem.getRtspPort() == 0) { + mediaServerItem.setRtspPort(zlmServerConfig.getRtspPort()); + } + if (mediaServerItem.getRtspSSLPort() == 0) { + mediaServerItem.setRtspSSLPort(zlmServerConfig.getRtspSSlport()); + } + if (mediaServerItem.getRtpProxyPort() == 0) { + mediaServerItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); + } + mediaServerItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/HookZlmServerKeepaliveEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/HookZlmServerKeepaliveEvent.java new file mode 100644 index 00000000..aa6431f5 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/HookZlmServerKeepaliveEvent.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.media.zlm.event; + +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import org.springframework.context.ApplicationEvent; + +/** + * zlm 心跳事件 + */ +public class HookZlmServerKeepaliveEvent extends ApplicationEvent { + + public HookZlmServerKeepaliveEvent(Object source) { + super(source); + } + + private MediaServerItem mediaServerItem; + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } +}