diff --git a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java index 4217e140..3be8e4c4 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java @@ -1,63 +1,41 @@ package com.genersoft.iot.vmp.media; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.MediaConfig; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; -import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.annotation.Order; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +/** + * 启动是从配置文件加载节点信息,以及发送个节点状态管理去控制节点状态 + */ @Component @Order(value=12) public class MediaServerConfig implements CommandLineRunner { private final static Logger logger = LoggerFactory.getLogger(MediaServerConfig.class); - private Map startGetMedia; - @Autowired private ApplicationEventPublisher applicationEventPublisher; - @Autowired - private ZlmHttpHookSubscribe hookSubscribe; - - @Autowired - private EventPublisher publisher; - @Autowired private IMediaServerService mediaServerService; @Autowired private MediaConfig mediaConfig; - @Autowired - private DynamicTask dynamicTask; - @Override public void run(String... strings) throws Exception { - // TODO 获取所有的离线节点信息 + // 清理所有在线节点的缓存信息 mediaServerService.clearMediaServerForOnline(); MediaServerItem defaultMediaServer = mediaServerService.getDefaultMediaServer(); if (defaultMediaServer == null) { @@ -70,118 +48,9 @@ public class MediaServerConfig implements CommandLineRunner { mediaServerService.syncCatchFromDatabase(); // 获取所有的zlm, 并开启主动连接 List all = mediaServerService.getAllFromDatabase(); - + logger.info("[媒体节点] 加载节点列表, 共{}个节点", all.size()); MediaServerChangeEvent event = new MediaServerChangeEvent(this); - event.setMediaServerItem(); + event.setMediaServerItemList(all); applicationEventPublisher.publishEvent(event); - // TODO 此处以下代码弃用 - - - HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started(); - // 订阅 媒体节点启动事件, 新的媒体节点也会从这里进入系统 - hookSubscribe.addSubscribe(hookSubscribeForServerStarted, - (mediaServerItem, hookParam)->{ - ZLMServerConfig zlmServerConfig = (ZLMServerConfig)hookParam; - if (zlmServerConfig !=null ) { - if (startGetMedia != null) { - startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId()); - if (startGetMedia.isEmpty()) { - hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); - } - } - } - }); - - // 获取zlm信息 - logger.info("[zlm] 等待默认zlm中..."); - - // 获取所有的zlm, 并开启主动连接 - List all = mediaServerService.getAllFromDatabase(); - Map allMap = new HashMap<>(); - mediaServerService.updateVmServer(all); - if (all.size() == 0) { - all.add(mediaConfig.getMediaSerItem()); - } - for (MediaServerItem mediaServerItem : all) { - if (startGetMedia == null) { - startGetMedia = new ConcurrentHashMap<>(); - } - startGetMedia.put(mediaServerItem.getId(), true); - connectZlmServer(mediaServerItem); - allMap.put(mediaServerItem.getId(), mediaServerItem); - } - String taskKey = "zlm-connect-timeout"; - dynamicTask.startDelay(taskKey, ()->{ - if (startGetMedia != null && startGetMedia.size() > 0) { - Set allZlmId = startGetMedia.keySet(); - for (String id : allZlmId) { - logger.error("[ {} ]]主动连接失败,不再尝试连接", id); - } - startGetMedia = null; - } - // 获取redis中所有的zlm - List allInRedis = mediaServerService.getAll(); - for (MediaServerItem mediaServerItem : allInRedis) { - if (!allMap.containsKey(mediaServerItem.getId())) { - mediaServerService.delete(mediaServerItem.getId()); - } - } - }, 60 * 1000 ); - } - - @Async("taskExecutor") - public void connectZlmServer(MediaServerItem mediaServerItem){ - String connectZlmServerTaskKey = "connect-zlm-" + mediaServerItem.getId(); - ZLMServerConfig zlmServerConfigFirst = getMediaServerConfig(mediaServerItem); - if (zlmServerConfigFirst != null) { - zlmServerConfigFirst.setIp(mediaServerItem.getIp()); - zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort()); - startGetMedia.remove(mediaServerItem.getId()); - if (startGetMedia.size() == 0) { - hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); - } - mediaServerService.zlmServerOnline(zlmServerConfigFirst); - }else { - logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接", - mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); - publisher.zlmOfflineEventPublish(mediaServerItem.getId()); - } - - dynamicTask.startCron(connectZlmServerTaskKey, ()->{ - ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem); - if (zlmServerConfig != null) { - dynamicTask.stop(connectZlmServerTaskKey); - zlmServerConfig.setIp(mediaServerItem.getIp()); - zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort()); - startGetMedia.remove(mediaServerItem.getId()); - if (startGetMedia.size() == 0) { - hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); - } - mediaServerService.zlmServerOnline(zlmServerConfig); - } - }, 2000); - } - - public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem) { - if (startGetMedia == null) { return null;} - if (!mediaServerItem.isDefaultServer() && mediaServerService.getOne(mediaServerItem.getId()) == null) { - return null; - } - if ( startGetMedia.get(mediaServerItem.getId()) == null || !startGetMedia.get(mediaServerItem.getId())) { - return null; - } - JSONObject responseJson = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); - ZLMServerConfig zlmServerConfig = null; - if (responseJson != null) { - JSONArray data = responseJson.getJSONArray("data"); - if (data != null && data.size() > 0) { - zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); - } - } else { - logger.error("[ {} ]-[ {}:{} ]主动连接失败, 2s后重试", - mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); - } - return zlmServerConfig; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java index 6cf8bbfc..e1d2a94b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java @@ -3,19 +3,30 @@ package com.genersoft.iot.vmp.media.event; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.springframework.context.ApplicationEvent; +import java.util.Arrays; +import java.util.List; + public class MediaServerChangeEvent extends ApplicationEvent { public MediaServerChangeEvent(Object source) { super(source); } - private MediaServerItem mediaServerItem; + private List mediaServerItemList; - public MediaServerItem getMediaServerItem() { - return mediaServerItem; + public List getMediaServerItemList() { + return mediaServerItemList; } - public void setMediaServerItem(MediaServerItem mediaServerItem) { - this.mediaServerItem = mediaServerItem; + public void setMediaServerItemList(List mediaServerItemList) { + this.mediaServerItemList = mediaServerItemList; + } + + public void setMediaServerItemList(MediaServerItem... mediaServerItemArray) { + this.mediaServerItemList.addAll(Arrays.asList(mediaServerItemArray)); + } + + public void setMediaServerItem(List mediaServerItemList) { + this.mediaServerItemList = mediaServerItemList; } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index b483e6b6..be090ec9 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -462,30 +462,6 @@ public class MediaServerServiceImpl implements IMediaServerService { zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); } - class KeepAliveTimeoutRunnable implements Runnable{ - - private MediaServerItem serverItem; - - public KeepAliveTimeoutRunnable(MediaServerItem serverItem) { - this.serverItem = serverItem; - } - - @Override - public void run() { - logger.info("[媒体服务节点心跳到期]:" + serverItem.getId()); - // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理 - JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(serverItem); - if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) { - logger.info("[媒体服务节点心跳到期]:{}验证后媒体服务节点仍在线,恢复心跳信息,请检查媒体服务节点是否可以正常向wvp发送心跳", serverItem.getId()); - // 添加媒体服务节点信息 - updateMediaServerKeepalive(serverItem.getId(), null); - }else { - publisher.zlmOfflineEventPublish(serverItem.getId()); - } - } - } - - @Override public void zlmServerOffline(String mediaServerId) { delete(mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java index ef0c67a2..0637b531 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java @@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent; import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; -import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -48,14 +47,18 @@ public class ZLMMediaServerStatusManger { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaServerChangeEvent event) { - if (event.getMediaServerItem() == null - || !type.equals(event.getMediaServerItem().getType()) - || event.getMediaServerItem().isStatus()) { + if (event.getMediaServerItemList() == null + || event.getMediaServerItemList().isEmpty()) { return; } - logger.info("[ZLM-添加待上线节点] ID:" + event.getMediaServerItem().getId()); - offlineZlmPrimaryMap.put(event.getMediaServerItem().getId(), event.getMediaServerItem()); - offlineZlmTimeMap.put(event.getMediaServerItem().getId(), System.currentTimeMillis()); + for (MediaServerItem mediaServerItem : event.getMediaServerItemList()) { + if (!type.equals(mediaServerItem.getType())) { + continue; + } + logger.info("[ZLM-添加待上线节点] ID:" + mediaServerItem.getId()); + offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem); + offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); + } } @Async("taskExecutor") @@ -147,11 +150,11 @@ public class ZLMMediaServerStatusManger { } private void online(MediaServerItem mediaServerItem) { - logger.info("[ZLM-连接成功] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); offlineZlmPrimaryMap.remove(mediaServerItem.getId()); offlineZlmsecondaryMap.remove(mediaServerItem.getId()); offlineZlmTimeMap.remove(mediaServerItem.getId()); if (!mediaServerItem.isStatus()) { + logger.info("[ZLM-连接成功] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); mediaServerItem.setStatus(true); mediaServerService.update(mediaServerItem); }