From 181bf76862b7d2ccf4382bb782a9dd11cfb10c4e Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 20 Mar 2024 18:54:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=8A=82=E7=82=B9=E5=9C=A8?= =?UTF-8?q?=E7=BA=BF=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/MediaConfig.java | 7 +- .../iot/vmp/gb28181/event/EventPublisher.java | 14 +- .../iot/vmp/media/MediaServerConfig.java | 16 +- .../media/abl/ABLMediaNodeServerService.java | 5 + .../media/event/MediaServerChangeEvent.java | 2 + .../media/event/MediaServerDeleteEvent.java | 11 + .../event/MediaServerEventAbstract.java | 2 +- .../event/MediaServerOfflineEvent.java | 4 +- .../event/MediaServerOnlineEvent.java | 4 +- .../event/MediaServerStatusEventListener.java | 11 +- .../service/IMediaNodeServerService.java | 2 + .../media/service/IMediaServerService.java | 35 +- .../service/impl/MediaServerServiceImpl.java | 349 ++++-------------- .../iot/vmp/media/zlm/SendRtpPortManager.java | 1 + .../vmp/media/zlm/ZLMHttpHookListener.java | 41 +- .../media/zlm/ZLMMediaNodeServerService.java | 40 +- .../media/zlm/ZLMMediaServerStatusManger.java | 129 ++++++- .../vmp/storager/dao/MediaServerMapper.java | 4 + .../cloudRecord/CloudRecordController.java | 4 +- .../vmp/vmanager/server/ServerController.java | 10 +- .../src/components/dialog/MediaServerEdit.vue | 11 +- web_src/src/components/service/MediaServer.js | 3 +- 数据库/abl/更新-mysql-2.7.0.sql | 2 +- 23 files changed, 334 insertions(+), 373 deletions(-) create mode 100755 src/main/java/com/genersoft/iot/vmp/media/event/MediaServerDeleteEvent.java rename src/main/java/com/genersoft/iot/vmp/media/{zlm => }/event/MediaServerEventAbstract.java (91%) rename src/main/java/com/genersoft/iot/vmp/media/{zlm => }/event/MediaServerOfflineEvent.java (61%) rename src/main/java/com/genersoft/iot/vmp/media/{zlm => }/event/MediaServerOnlineEvent.java (60%) rename src/main/java/com/genersoft/iot/vmp/media/{zlm => }/event/MediaServerStatusEventListener.java (82%) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java index 884036ae..4664310d 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java @@ -87,6 +87,9 @@ public class MediaConfig{ @Value("${media.record-path:}") private String recordPath; + @Value("${media.type:zlm}") + private String type; + public String getId() { return id; } @@ -217,8 +220,10 @@ public class MediaConfig{ mediaServerItem.setRtpPortRange(rtpPortRange); mediaServerItem.setSendRtpPortRange(rtpSendPortRange); mediaServerItem.setRecordAssistPort(recordAssistPort); - mediaServerItem.setHookAliveInterval(30.00f); + mediaServerItem.setHookAliveInterval(10f); mediaServerItem.setRecordDay(recordDay); + mediaServerItem.setStatus(false); + mediaServerItem.setType(type); if (recordPath != null) { mediaServerItem.setRecordPath(recordPath); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index c363583a..ba538f19 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -1,17 +1,19 @@ package com.genersoft.iot.vmp.gb28181.event; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; +import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent; import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.media.zlm.event.MediaServerOfflineEvent; -import com.genersoft.iot.vmp.media.zlm.event.MediaServerOnlineEvent; +import com.genersoft.iot.vmp.media.event.MediaServerOfflineEvent; +import com.genersoft.iot.vmp.media.event.MediaServerOnlineEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; -import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; - import javax.sip.TimeoutEvent; import java.util.ArrayList; import java.util.HashSet; @@ -39,7 +41,7 @@ public class EventPublisher { applicationEventPublisher.publishEvent(alarmEvent); } - public void zlmOfflineEventPublish(String mediaServerId){ + public void mediaServerOfflineEventPublish(String mediaServerId){ MediaServerOfflineEvent outEvent = new MediaServerOfflineEvent(this); outEvent.setMediaServerId(mediaServerId); applicationEventPublisher.publishEvent(outEvent); diff --git a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java index 3be8e4c4..761d9709 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java @@ -38,11 +38,19 @@ public class MediaServerConfig implements CommandLineRunner { // 清理所有在线节点的缓存信息 mediaServerService.clearMediaServerForOnline(); MediaServerItem defaultMediaServer = mediaServerService.getDefaultMediaServer(); - if (defaultMediaServer == null) { - mediaServerService.addToDatabase(mediaConfig.getMediaSerItem()); + MediaServerItem mediaSerItemInConfig = mediaConfig.getMediaSerItem(); + if (defaultMediaServer != null && mediaSerItemInConfig.getId().equals(defaultMediaServer.getId())) { + mediaServerService.update(mediaSerItemInConfig); }else { - MediaServerItem mediaSerItem = mediaConfig.getMediaSerItem(); - mediaServerService.updateToDatabase(mediaSerItem); + if (defaultMediaServer != null) { + mediaServerService.delete(defaultMediaServer.getId()); + } + MediaServerItem mediaServerItem = mediaServerService.getOneFromDatabase(mediaSerItemInConfig.getId()); + if (mediaServerItem == null) { + mediaServerService.add(mediaSerItemInConfig); + }else { + mediaServerService.update(mediaSerItemInConfig); + } } // 发送媒体节点变化事件 mediaServerService.syncCatchFromDatabase(); diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java index cbce1d6b..445e1e6a 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java @@ -42,4 +42,9 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { public void online(MediaServerItem mediaServerItem) { } + + @Override + public MediaServerItem checkMediaServer(String ip, int port, String secret) { + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java index e1d2a94b..a880b85c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerChangeEvent.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.event; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.springframework.context.ApplicationEvent; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -23,6 +24,7 @@ public class MediaServerChangeEvent extends ApplicationEvent { } public void setMediaServerItemList(MediaServerItem... mediaServerItemArray) { + this.mediaServerItemList = new ArrayList<>(); this.mediaServerItemList.addAll(Arrays.asList(mediaServerItemArray)); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerDeleteEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerDeleteEvent.java new file mode 100755 index 00000000..10a368b4 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerDeleteEvent.java @@ -0,0 +1,11 @@ +package com.genersoft.iot.vmp.media.event; + +/** + * zlm在线事件 + */ +public class MediaServerDeleteEvent extends MediaServerEventAbstract { + + public MediaServerDeleteEvent(Object source) { + super(source); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerEventAbstract.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerEventAbstract.java similarity index 91% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerEventAbstract.java rename to src/main/java/com/genersoft/iot/vmp/media/event/MediaServerEventAbstract.java index 66108fc9..91806f8d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerEventAbstract.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerEventAbstract.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.media.zlm.event; +package com.genersoft.iot.vmp.media.event; import org.springframework.context.ApplicationEvent; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerOfflineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOfflineEvent.java similarity index 61% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerOfflineEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOfflineEvent.java index de0ed334..ce3c5a9d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerOfflineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOfflineEvent.java @@ -1,4 +1,6 @@ -package com.genersoft.iot.vmp.media.zlm.event; +package com.genersoft.iot.vmp.media.event; + +import com.genersoft.iot.vmp.media.event.MediaServerEventAbstract; /** * zlm离线事件类 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOnlineEvent.java similarity index 60% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerOnlineEvent.java rename to src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOnlineEvent.java index d3a746a5..5d9bdc43 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerOnlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerOnlineEvent.java @@ -1,4 +1,6 @@ -package com.genersoft.iot.vmp.media.zlm.event; +package com.genersoft.iot.vmp.media.event; + +import com.genersoft.iot.vmp.media.event.MediaServerEventAbstract; /** * zlm在线事件 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerStatusEventListener.java similarity index 82% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerStatusEventListener.java rename to src/main/java/com/genersoft/iot/vmp/media/event/MediaServerStatusEventListener.java index fe5dfc98..0d8e38c0 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/MediaServerStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/MediaServerStatusEventListener.java @@ -1,6 +1,5 @@ -package com.genersoft.iot.vmp.media.zlm.event; +package com.genersoft.iot.vmp.media.event; -import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; @@ -30,16 +29,13 @@ public class MediaServerStatusEventListener { @Autowired private IStreamProxyService streamProxyService; - @Autowired - private IMediaServerService mediaServerService; - @Autowired private IPlayService playService; @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaServerOnlineEvent event) { - logger.info("[ZLM] 上线 ID:" + event.getMediaServerId()); + logger.info("[媒体节点] 上线 ID:" + event.getMediaServerId()); streamPushService.zlmServerOnline(event.getMediaServerId()); streamProxyService.zlmServerOnline(event.getMediaServerId()); playService.zlmServerOnline(event.getMediaServerId()); @@ -49,9 +45,8 @@ public class MediaServerStatusEventListener { @EventListener public void onApplicationEvent(MediaServerOfflineEvent event) { - logger.info("[ZLM] 离线,ID:" + event.getMediaServerId()); + logger.info("[媒体节点] 离线,ID:" + event.getMediaServerId()); // 处理ZLM离线 - mediaServerService.zlmServerOffline(event.getMediaServerId()); streamProxyService.zlmServerOffline(event.getMediaServerId()); streamPushService.zlmServerOffline(event.getMediaServerId()); playService.zlmServerOffline(event.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java index cc8b66b8..43f9fc5d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java @@ -17,4 +17,6 @@ public interface IMediaNodeServerService { boolean checkNodeId(MediaServerItem mediaServerItem); void online(MediaServerItem mediaServerItem); + + MediaServerItem checkMediaServer(String ip, int port, String secret); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 0af9ba10..d6015e0a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -1,9 +1,7 @@ package com.genersoft.iot.vmp.media.service; import com.genersoft.iot.vmp.common.CommonCallback; -import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -14,6 +12,8 @@ import java.util.List; */ public interface IMediaServerService { + List getAllOnlineList(); + List getAll(); List getAllFromDatabase(); @@ -24,24 +24,8 @@ public interface IMediaServerService { void syncCatchFromDatabase(); - /** - * 新的节点加入 - * @param zlmServerConfig - * @return - */ - void zlmServerOnline(ZLMServerConfig zlmServerConfig); - - /** - * 节点离线 - * @param mediaServerId - * @return - */ - void zlmServerOffline(String mediaServerId); - MediaServerItem getMediaServerForMinimumLoad(Boolean hasAssist); - void setZLMConfig(MediaServerItem mediaServerItem, boolean restart); - void updateVmServer(List mediaServerItemList); SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, @@ -70,30 +54,19 @@ public interface IMediaServerService { void add(MediaServerItem mediaSerItem); - int addToDatabase(MediaServerItem mediaSerItem); - - int updateToDatabase(MediaServerItem mediaSerItem); - void resetOnlineServerItem(MediaServerItem serverItem); - MediaServerItem checkMediaServer(String ip, int port, String secret); + MediaServerItem checkMediaServer(String ip, int port, String secret, String type); boolean checkMediaRecordServer(String ip, int port); void delete(String id); - void deleteDb(String id); - MediaServerItem getDefaultMediaServer(); - void updateMediaServerKeepalive(String mediaServerId, ServerKeepaliveData data); - - /** - * 获取负载信息 - * @return - */ MediaServerLoad getLoad(MediaServerItem mediaServerItem); List getAllWithAssistPort(); + MediaServerItem getOneFromDatabase(String id); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index be090ec9..a959de78 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -1,24 +1,17 @@ package com.genersoft.iot.vmp.media.service.impl; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; +import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; -import com.genersoft.iot.vmp.media.zlm.*; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; -import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; -import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -33,15 +26,11 @@ import okhttp3.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; import org.springframework.util.ObjectUtils; -import java.io.File; import java.time.LocalDateTime; import java.util.*; @@ -54,48 +43,15 @@ public class MediaServerServiceImpl implements IMediaServerService { private final static Logger logger = LoggerFactory.getLogger(MediaServerServiceImpl.class); - private final String zlmKeepaliveKeyPrefix = "zlm-keepalive_"; - - @Autowired - private SipConfig sipConfig; - @Autowired private SSRCFactory ssrcFactory; - @Value("${server.ssl.enabled:false}") - private boolean sslEnabled; - - @Value("${server.port}") - private Integer serverPort; - @Autowired private UserSetting userSetting; - @Autowired - private SendRtpPortManager sendRtpPortManager; - - @Autowired - private AssistRESTfulUtils assistRESTfulUtils; - @Autowired private MediaServerMapper mediaServerMapper; - @Autowired - private DataSourceTransactionManager dataSourceTransactionManager; - - @Autowired - private TransactionDefinition transactionDefinition; - - - @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private EventPublisher publisher; - - @Autowired - private DynamicTask dynamicTask; - @Autowired private IRedisCatchStorage redisCatchStorage; @@ -108,6 +64,9 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private Map nodeServerServiceMap; + @Autowired + private ApplicationEventPublisher applicationEventPublisher; + /** * 初始化 @@ -259,18 +218,28 @@ public class MediaServerServiceImpl implements IMediaServerService { mediaServerMapper.update(mediaSerItem); MediaServerItem mediaServerItemInRedis = getOne(mediaSerItem.getId()); MediaServerItem mediaServerItemInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId()); - if (mediaServerItemInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaSerItem.getId())) { + if (mediaServerItemInDataBase == null) { + return; + } + mediaServerItemInDataBase.setStatus(mediaSerItem.isStatus()); + if (mediaServerItemInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServerItemInDataBase.getId())) { ssrcFactory.initMediaServerSSRC(mediaServerItemInDataBase.getId(),null); } String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId(); redisTemplate.opsForValue().set(key, mediaServerItemInDataBase); - if (mediaSerItem.isStatus()) { - resetOnlineServerItem(mediaSerItem); + if (mediaServerItemInDataBase.isStatus()) { + resetOnlineServerItem(mediaServerItemInDataBase); + }else { + // 发送事件 + MediaServerChangeEvent event = new MediaServerChangeEvent(this); + event.setMediaServerItemList(mediaServerItemInDataBase); + applicationEventPublisher.publishEvent(event); } } + @Override - public List getAll() { + public List getAllOnlineList() { List result = new ArrayList<>(); List mediaServerKeys = RedisUtil.scan(redisTemplate, String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" )); String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); @@ -298,6 +267,21 @@ public class MediaServerServiceImpl implements IMediaServerService { return result; } + @Override + public List getAll() { + List mediaServerList = mediaServerMapper.queryAll(); + if (mediaServerList.isEmpty()) { + return new ArrayList<>(); + } + for (MediaServerItem mediaServerItem : mediaServerList) { + MediaServerItem mediaServerItemInRedis = getOne(mediaServerItem.getId()); + if (mediaServerItemInRedis != null) { + mediaServerItem.setStatus(mediaServerItemInRedis.isStatus()); + } + } + return mediaServerList; + } + @Override public List getAllFromDatabase() { @@ -351,12 +335,15 @@ public class MediaServerServiceImpl implements IMediaServerService { public void add(MediaServerItem mediaServerItem) { mediaServerItem.setCreateTime(DateUtil.getNow()); mediaServerItem.setUpdateTime(DateUtil.getNow()); - mediaServerItem.setHookAliveInterval(30f); + if (mediaServerItem.getHookAliveInterval() == null || mediaServerItem.getHookAliveInterval() == 0F) { + mediaServerItem.setHookAliveInterval(10F); + } if (mediaServerItem.getType() == null) { logger.info("[添加媒体节点] 失败, mediaServerItem的类型:为空"); return; } if (mediaServerMapper.queryOne(mediaServerItem.getId()) != null) { + logger.info("[添加媒体节点] 失败, 媒体服务ID已存在,请修改媒体服务器配置, {}", mediaServerItem.getId()); throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败,媒体服务ID [ " + mediaServerItem.getId() + " ] 已存在,请修改媒体服务器配置"); } IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType()); @@ -364,111 +351,17 @@ public class MediaServerServiceImpl implements IMediaServerService { logger.info("[添加媒体节点] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType()); return; } - if (mediaNodeServerService.checkNodeId(mediaServerItem)) { - mediaServerMapper.add(mediaServerItem); + mediaServerMapper.add(mediaServerItem); + if (mediaServerItem.isStatus()) { mediaNodeServerService.online(mediaServerItem); }else { - throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败,媒体服务ID [ " + mediaServerItem.getId() + " ] 已存在,请修改媒体服务器配置"); + // 发送事件 + MediaServerChangeEvent event = new MediaServerChangeEvent(this); + event.setMediaServerItemList(mediaServerItem); + applicationEventPublisher.publishEvent(event); } } - @Override - public int addToDatabase(MediaServerItem mediaSerItem) { - return mediaServerMapper.add(mediaSerItem); - } - - @Override - public int updateToDatabase(MediaServerItem mediaSerItem) { - int result = 0; - if (mediaSerItem.isDefaultServer()) { - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - int delResult = mediaServerMapper.delDefault(); - if (delResult == 0) { - logger.error("移除数据库默认媒体服务节点节点失败"); - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return 0; - } - result = mediaServerMapper.add(mediaSerItem); - dataSourceTransactionManager.commit(transactionStatus); //手动提交 - }else { - result = mediaServerMapper.update(mediaSerItem); - } - return result; - } - - /** - * 处理媒体服务节点上线 - * @param zlmServerConfig 媒体服务节点上线携带的参数 - */ - @Override - public void zlmServerOnline(ZLMServerConfig zlmServerConfig) { - - MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()); - if (serverItem == null) { - logger.warn("[未注册的媒体服务节点] 拒接接入:{}来自{}:{}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); - logger.warn("请检查媒体服务节点的ID配置是否与WVP的一致"); - return; - }else { - logger.info("[媒体服务节点] 正在连接 : {} -> {}:{}", - zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); - } - serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval()); - if (serverItem.getHttpPort() == 0) { - serverItem.setHttpPort(zlmServerConfig.getHttpPort()); - } - if (serverItem.getHttpSSlPort() == 0) { - serverItem.setHttpSSlPort(zlmServerConfig.getHttpSSLport()); - } - if (serverItem.getRtmpPort() == 0) { - serverItem.setRtmpPort(zlmServerConfig.getRtmpPort()); - } - if (serverItem.getRtmpSSlPort() == 0) { - serverItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort()); - } - if (serverItem.getRtspPort() == 0) { - serverItem.setRtspPort(zlmServerConfig.getRtspPort()); - } - if (serverItem.getRtspSSLPort() == 0) { - serverItem.setRtspSSLPort(zlmServerConfig.getRtspSSlport()); - } - if (serverItem.getRtpProxyPort() == 0) { - serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); - } - serverItem.setStatus(true); - - if (ObjectUtils.isEmpty(serverItem.getId())) { - logger.warn("[未注册的媒体服务节点] serverItem缺少ID, 无法接入:{}:{}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); - return; - } - mediaServerMapper.update(serverItem); - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId(); - if (!ssrcFactory.hasMediaServerSSRC(serverItem.getId())) { - ssrcFactory.initMediaServerSSRC(zlmServerConfig.getGeneralMediaServerId(), null); - } - redisTemplate.opsForValue().set(key, serverItem); - resetOnlineServerItem(serverItem); - - - if (serverItem.isAutoConfig()) { - setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())); - } - final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId(); - dynamicTask.stop(zlmKeepaliveKey); - dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval().intValue() + 5) * 1000); - publisher.mediaServerOnlineEventPublish(serverItem.getId()); - - logger.info("[媒体服务节点] 连接成功 {} - {}:{} ", - zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); - } - - @Override - public void zlmServerOffline(String mediaServerId) { - delete(mediaServerId); - final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerId; - dynamicTask.stop(zlmKeepaliveKey); - } - @Override public void resetOnlineServerItem(MediaServerItem serverItem) { // 更新缓存 @@ -549,111 +442,23 @@ public class MediaServerServiceImpl implements IMediaServerService { return mediaServerItem; } - /** - * 对媒体服务节点服务器进行基础配置 - * @param mediaServerItem 服务ID - * @param restart 是否重启媒体服务节点 - */ @Override - public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) { - logger.info("[媒体服务节点] 正在设置 :{} -> {}:{}", - mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); - String protocol = sslEnabled ? "https" : "http"; - String hookPrefix = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort); - - Map param = new HashMap<>(); - param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline - if (mediaServerItem.getRtspPort() != 0) { - param.put("ffmpeg.snap", "%s -rtsp_transport tcp -i %s -y -f mjpeg -frames:v 1 %s"); - } - param.put("hook.enable","1"); - param.put("hook.on_flow_report",""); - param.put("hook.on_play",String.format("%s/on_play", hookPrefix)); - param.put("hook.on_http_access",""); - param.put("hook.on_publish", String.format("%s/on_publish", hookPrefix)); - param.put("hook.on_record_ts",""); - param.put("hook.on_rtsp_auth",""); - param.put("hook.on_rtsp_realm",""); - param.put("hook.on_server_started",String.format("%s/on_server_started", hookPrefix)); - param.put("hook.on_shell_login",""); - param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrefix)); - param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrefix)); - param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrefix)); - param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrefix)); - param.put("hook.on_send_rtp_stopped",String.format("%s/on_send_rtp_stopped", hookPrefix)); - param.put("hook.on_rtp_server_timeout",String.format("%s/on_rtp_server_timeout", hookPrefix)); - param.put("hook.on_record_mp4",String.format("%s/on_record_mp4", hookPrefix)); - param.put("hook.timeoutSec","20"); - // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 - // 置0关闭此特性(推流断开会导致立即断开播放器) - // 此参数不应大于播放器超时时间 - // 优化此消息以更快的收到流注销事件 - param.put("protocol.continue_push_ms", "3000" ); - // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流, - // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 - if (mediaServerItem.isRtpEnable() && !ObjectUtils.isEmpty(mediaServerItem.getRtpPortRange())) { - param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-")); - } - - if (!ObjectUtils.isEmpty(mediaServerItem.getRecordPath())) { - File recordPathFile = new File(mediaServerItem.getRecordPath()); - param.put("protocol.mp4_save_path", recordPathFile.getParentFile().getPath()); - param.put("protocol.downloadRoot", recordPathFile.getParentFile().getPath()); - param.put("record.appName", recordPathFile.getName()); - } - - JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); - - if (responseJSON != null && responseJSON.getInteger("code") == 0) { - if (restart) { - logger.info("[媒体服务节点] 设置成功,开始重启以保证配置生效 {} -> {}:{}", - mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); - zlmresTfulUtils.restartServer(mediaServerItem); - }else { - logger.info("[媒体服务节点] 设置成功 {} -> {}:{}", - mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); - } - - - }else { - logger.info("[媒体服务节点] 设置媒体服务节点失败 {} -> {}:{}", - mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); - } - - - } - - - @Override - public MediaServerItem checkMediaServer(String ip, int port, String secret) { + public MediaServerItem checkMediaServer(String ip, int port, String secret, String type) { if (mediaServerMapper.queryOneByHostAndPort(ip, port) != null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "此连接已存在"); } - MediaServerItem mediaServerItem = new MediaServerItem(); - mediaServerItem.setIp(ip); - mediaServerItem.setHttpPort(port); - mediaServerItem.setSecret(secret); - JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); - if (responseJSON == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接失败"); + + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(type); + if (mediaNodeServerService == null) { + logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", type); + return null; } - JSONArray data = responseJSON.getJSONArray("data"); - ZLMServerConfig zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); - if (zlmServerConfig == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "读取配置失败"); + MediaServerItem mediaServerItem = mediaNodeServerService.checkMediaServer(ip, port, secret); + if (mediaServerItem != null) { + if (mediaServerMapper.queryOne(mediaServerItem.getId()) != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体服务ID [" + mediaServerItem.getId() + " ] 已存在,请修改媒体服务器配置"); + } } - if (mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()) != null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体服务ID [" + zlmServerConfig.getGeneralMediaServerId() + " ] 已存在,请修改媒体服务器配置"); - } - mediaServerItem.setHttpSSlPort(zlmServerConfig.getHttpPort()); - mediaServerItem.setRtmpPort(zlmServerConfig.getRtmpPort()); - mediaServerItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort()); - mediaServerItem.setRtspPort(zlmServerConfig.getRtspPort()); - mediaServerItem.setRtspSSLPort(zlmServerConfig.getRtspSSlport()); - mediaServerItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); - mediaServerItem.setStreamIp(ip); - mediaServerItem.setHookIp(sipConfig.getIp().split(",")[0]); - mediaServerItem.setSdpIp(ip); return mediaServerItem; } @@ -678,46 +483,24 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public void delete(String id) { + mediaServerMapper.delOne(id); redisTemplate.opsForZSet().remove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + id; redisTemplate.delete(key); - } - @Override - public void deleteDb(String id){ - //同步删除数据库中的数据 - mediaServerMapper.delOne(id); + // 发送节点移除通知 + MediaServerDeleteEvent event = new MediaServerDeleteEvent(this); + event.setMediaServerId(id); + applicationEventPublisher.publishEvent(event); } @Override - public void updateMediaServerKeepalive(String mediaServerId, ServerKeepaliveData data) { - MediaServerItem mediaServerItem = getOne(mediaServerId); - if (mediaServerItem == null) { - // 缓存不存在,从数据库查询,如果数据库不存在则是错误的 - mediaServerItem = getOneFromDatabase(mediaServerId); - if (mediaServerItem == null) { - logger.warn("[更新媒体服务节点 保活信息] 流媒体{}尚未加入使用,请检查节点中是否含有此流媒体 ", mediaServerId); - return; - } - // 媒体服务节点连接重试 - logger.warn("[更新媒体服务节点 保活信息]尝试链接zml id {}", mediaServerId); - ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null); - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); - redisTemplate.opsForValue().set(key, mediaServerItem); - resetOnlineServerItem(mediaServerItem); - clearRTPServer(mediaServerItem); - } - final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerItem.getId(); - dynamicTask.stop(zlmKeepaliveKey); - dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval().intValue() + 5) * 1000); - } - - private MediaServerItem getOneFromDatabase(String mediaServerId) { + public MediaServerItem getOneFromDatabase(String mediaServerId) { return mediaServerMapper.queryOne(mediaServerId); } @Override public void syncCatchFromDatabase() { - List allInCatch = getAll(); + List allInCatch = getAllOnlineList(); List allInDatabase = mediaServerMapper.queryAll(); Map mediaServerItemMap = new HashMap<>(); @@ -748,4 +531,6 @@ public class MediaServerServiceImpl implements IMediaServerService { public List getAllWithAssistPort() { return mediaServerMapper.queryAllWithAssistPort(); } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java index 3f28d02a..fda2e67c 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java @@ -83,6 +83,7 @@ public class SendRtpPortManager { } private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map sendRtpItemMap){ + // TODO 这里改为只取偶数端口 RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory()); if (redisAtomicInteger.get() < startPort) { redisAtomicInteger.set(startPort); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 9bab3721..4b2557de 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -10,8 +10,8 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; @@ -21,6 +21,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; +import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent; +import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -35,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; @@ -70,9 +73,6 @@ public class ZLMHttpHookListener { @Autowired private AudioBroadcastManager audioBroadcastManager; - @Autowired - private ZLMServerFactory zlmServerFactory; - @Autowired private IPlayService playService; @@ -121,9 +121,6 @@ public class ZLMHttpHookListener { @Autowired private VideoStreamSessionManager sessionManager; - @Autowired - private AssistRESTfulUtils assistRESTfulUtils; - @Autowired private SSRCFactory ssrcFactory; @@ -134,6 +131,9 @@ public class ZLMHttpHookListener { @Autowired private RedisTemplate redisTemplate; + @Autowired + private ApplicationEventPublisher applicationEventPublisher; + /** * 服务器定时上报时间,上报间隔可配置,默认10s上报一次 */ @@ -141,8 +141,6 @@ public class ZLMHttpHookListener { @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { - - taskExecutor.execute(() -> { List subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); if (subscribes != null && subscribes.size() > 0) { @@ -151,8 +149,16 @@ public class ZLMHttpHookListener { } } }); - mediaServerService.updateMediaServerKeepalive(param.getMediaServerId(), param.getData()); - + try { + HookZlmServerKeepaliveEvent event = new HookZlmServerKeepaliveEvent(this); + MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); + if (mediaServerItem != null) { + event.setMediaServerItem(mediaServerItem); + applicationEventPublisher.publishEvent(event); + } + }catch (Exception e) { + logger.info("[ZLM-HOOK-心跳] 发送通知失败 ", e); + } return HookResult.SUCCESS(); } @@ -160,7 +166,6 @@ public class ZLMHttpHookListener { * 播放器鉴权事件,rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。 */ @ResponseBody - @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8") public HookResult onPlay(@RequestBody OnPlayHookParam param) { if (logger.isDebugEnabled()) { @@ -199,6 +204,7 @@ public class ZLMHttpHookListener { JSONObject json = (JSONObject) JSON.toJSON(param); logger.info("[ZLM HOOK]推流鉴权:{}->{}", param.getMediaServerId(), param); + // TODO 加快处理速度 String mediaServerId = json.getString("mediaServerId"); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); @@ -801,8 +807,17 @@ public class ZLMHttpHookListener { subscribe.response(null, zlmServerConfig); } } - mediaServerService.zlmServerOnline(zlmServerConfig); }); + try { + HookZlmServerStartEvent event = new HookZlmServerStartEvent(this); + MediaServerItem mediaServerItem = mediaServerService.getOne(zlmServerConfig.getMediaServerId()); + if (mediaServerItem != null) { + event.setMediaServerItem(mediaServerItem); + applicationEventPublisher.publishEvent(event); + } + }catch (Exception e) { + logger.info("[ZLM-HOOK-ZLM启动] 发送通知失败 ", e); + } return HookResult.SUCCESS(); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 510a972c..de330c30 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -4,10 +4,13 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service("zlm") @@ -19,9 +22,12 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { @Autowired private ZLMServerFactory zlmServerFactory; + @Value("${sip.ip}") + private String sipIp; + @Override public int createRTPServer(MediaServerItem mediaServerItem, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) { - return zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrc, port, onlyAuto, reUsePort, tcpMode);; + return zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrc, port, onlyAuto, reUsePort, tcpMode); } @Override @@ -68,4 +74,36 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { public void online(MediaServerItem mediaServerItem) { } + + @Override + public MediaServerItem checkMediaServer(String ip, int port, String secret) { + MediaServerItem mediaServerItem = new MediaServerItem(); + mediaServerItem.setIp(ip); + mediaServerItem.setHttpPort(port); + mediaServerItem.setSecret(secret); + JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); + if (responseJSON == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接失败"); + } + JSONArray data = responseJSON.getJSONArray("data"); + if (data == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "读取配置失败"); + } + ZLMServerConfig zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); + if (zlmServerConfig == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "读取配置失败"); + } + mediaServerItem.setId(zlmServerConfig.getGeneralMediaServerId()); + mediaServerItem.setHttpSSlPort(zlmServerConfig.getHttpPort()); + mediaServerItem.setRtmpPort(zlmServerConfig.getRtmpPort()); + mediaServerItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort()); + mediaServerItem.setRtspPort(zlmServerConfig.getRtspPort()); + mediaServerItem.setRtspSSLPort(zlmServerConfig.getRtspSSlport()); + mediaServerItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); + mediaServerItem.setStreamIp(ip); + mediaServerItem.setHookIp(sipIp.split(",")[0]); + mediaServerItem.setSdpIp(ip); + mediaServerItem.setType("zlm"); + return mediaServerItem; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java index 0637b531..6f97558d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java @@ -4,7 +4,9 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent; +import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; @@ -13,12 +15,17 @@ import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; +import java.io.File; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** @@ -42,6 +49,15 @@ public class ZLMMediaServerStatusManger { @Autowired private DynamicTask dynamicTask; + @Value("${server.ssl.enabled:false}") + private boolean sslEnabled; + + @Value("${server.port}") + private Integer serverPort; + + @Autowired + private UserSetting userSetting; + private final String type = "zlm"; @Async("taskExecutor") @@ -74,15 +90,13 @@ public class ZLMMediaServerStatusManger { return; } logger.info("[ZLM-HOOK事件-服务启动] ID:" + event.getMediaServerItem().getId()); - online(serverItem); + online(serverItem, null); } @Async("taskExecutor") @EventListener public void onApplicationEvent(HookZlmServerKeepaliveEvent event) { - if (event.getMediaServerItem() == null - || !type.equals(event.getMediaServerItem().getType()) - || event.getMediaServerItem().isStatus()) { + if (event.getMediaServerItem() == null) { return; } MediaServerItem serverItem = mediaServerService.getOne(event.getMediaServerItem().getId()); @@ -90,7 +104,19 @@ public class ZLMMediaServerStatusManger { return; } logger.info("[ZLM-HOOK事件-心跳] ID:" + event.getMediaServerItem().getId()); - online(serverItem); + online(serverItem, null); + } + + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaServerDeleteEvent event) { + if (event.getMediaServerId() == null) { + return; + } + logger.info("[ZLM-节点被移除] ID:" + event.getMediaServerId()); + offlineZlmPrimaryMap.remove(event.getMediaServerId()); + offlineZlmsecondaryMap.remove(event.getMediaServerId()); + offlineZlmTimeMap.remove(event.getMediaServerId()); } @Scheduled(fixedDelay = 10*1000) //每隔10秒检查一次 @@ -101,7 +127,7 @@ public class ZLMMediaServerStatusManger { } if (!offlineZlmPrimaryMap.isEmpty()) { for (MediaServerItem mediaServerItem : offlineZlmPrimaryMap.values()) { - if (offlineZlmTimeMap.get(mediaServerItem.getId()) > 30*60*1000) { + if (offlineZlmTimeMap.get(mediaServerItem.getId()) < System.currentTimeMillis() - 30*60*1000) { offlineZlmsecondaryMap.put(mediaServerItem.getId(), mediaServerItem); offlineZlmPrimaryMap.remove(mediaServerItem.getId()); continue; @@ -119,13 +145,13 @@ public class ZLMMediaServerStatusManger { }else { zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); initPort(mediaServerItem, zlmServerConfig); - online(mediaServerItem); + online(mediaServerItem, zlmServerConfig); } } } if (!offlineZlmsecondaryMap.isEmpty()) { for (MediaServerItem mediaServerItem : offlineZlmsecondaryMap.values()) { - if (offlineZlmTimeMap.get(mediaServerItem.getId()) < 30*60*1000) { + if (offlineZlmTimeMap.get(mediaServerItem.getId()) < System.currentTimeMillis() - 30*60*1000) { continue; } logger.info("[ZLM-尝试连接] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); @@ -143,20 +169,34 @@ public class ZLMMediaServerStatusManger { }else { zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); initPort(mediaServerItem, zlmServerConfig); - online(mediaServerItem); + online(mediaServerItem, zlmServerConfig); } } } } - private void online(MediaServerItem mediaServerItem) { + private void online(MediaServerItem mediaServerItem, ZLMServerConfig config) { offlineZlmPrimaryMap.remove(mediaServerItem.getId()); offlineZlmsecondaryMap.remove(mediaServerItem.getId()); offlineZlmTimeMap.remove(mediaServerItem.getId()); if (!mediaServerItem.isStatus()) { logger.info("[ZLM-连接成功] ID:{}, 地址: {}:{}", mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); mediaServerItem.setStatus(true); + mediaServerItem.setHookAliveInterval(10F); mediaServerService.update(mediaServerItem); + if(mediaServerItem.isAutoConfig()) { + if (config == null) { + JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); + JSONArray data = responseJSON.getJSONArray("data"); + if (data != null && !data.isEmpty()) { + config = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class); + } + } + if (config != null) { + setZLMConfig(mediaServerItem, "0".equals(config.getHookEnable()) + || !Objects.equals(mediaServerItem.getHookAliveInterval(), config.getHookAliveInterval())); + } + } } // 设置两次心跳未收到则认为zlm离线 String key = "zlm-keepalive-" + mediaServerItem.getId(); @@ -165,6 +205,8 @@ public class ZLMMediaServerStatusManger { mediaServerItem.setStatus(false); offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem); offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); + // TODO 发送离线通知 + mediaServerService.update(mediaServerItem); }, (int)(mediaServerItem.getHookAliveInterval() * 2 * 1000)); } private void initPort(MediaServerItem mediaServerItem, ZLMServerConfig zlmServerConfig) { @@ -186,7 +228,72 @@ public class ZLMMediaServerStatusManger { if (mediaServerItem.getRtpProxyPort() == 0) { mediaServerItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); } - mediaServerItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval()); + mediaServerItem.setHookAliveInterval(10F); + } + + public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) { + logger.info("[媒体服务节点] 正在设置 :{} -> {}:{}", + mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + String protocol = sslEnabled ? "https" : "http"; + String hookPrefix = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort); + + Map param = new HashMap<>(); + param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline + if (mediaServerItem.getRtspPort() != 0) { + param.put("ffmpeg.snap", "%s -rtsp_transport tcp -i %s -y -f mjpeg -frames:v 1 %s"); + } + param.put("hook.enable","1"); + param.put("hook.on_flow_report",""); + param.put("hook.on_play",String.format("%s/on_play", hookPrefix)); + param.put("hook.on_http_access",""); + param.put("hook.on_publish", String.format("%s/on_publish", hookPrefix)); + param.put("hook.on_record_ts",""); + param.put("hook.on_rtsp_auth",""); + param.put("hook.on_rtsp_realm",""); + param.put("hook.on_server_started",String.format("%s/on_server_started", hookPrefix)); + param.put("hook.on_shell_login",""); + param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrefix)); + param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrefix)); + param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrefix)); + param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrefix)); + param.put("hook.on_send_rtp_stopped",String.format("%s/on_send_rtp_stopped", hookPrefix)); + param.put("hook.on_rtp_server_timeout",String.format("%s/on_rtp_server_timeout", hookPrefix)); + param.put("hook.on_record_mp4",String.format("%s/on_record_mp4", hookPrefix)); + param.put("hook.timeoutSec","30"); + param.put("hook.alive_interval", mediaServerItem.getHookAliveInterval()); + // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 + // 置0关闭此特性(推流断开会导致立即断开播放器) + // 此参数不应大于播放器超时时间 + // 优化此消息以更快的收到流注销事件 + param.put("protocol.continue_push_ms", "3000" ); + // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流, + // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 + if (mediaServerItem.isRtpEnable() && !ObjectUtils.isEmpty(mediaServerItem.getRtpPortRange())) { + param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-")); + } + + if (!ObjectUtils.isEmpty(mediaServerItem.getRecordPath())) { + File recordPathFile = new File(mediaServerItem.getRecordPath()); + param.put("protocol.mp4_save_path", recordPathFile.getParentFile().getPath()); + param.put("protocol.downloadRoot", recordPathFile.getParentFile().getPath()); + param.put("record.appName", recordPathFile.getName()); + } + + JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); + + if (responseJSON != null && responseJSON.getInteger("code") == 0) { + if (restart) { + logger.info("[媒体服务节点] 设置成功,开始重启以保证配置生效 {} -> {}:{}", + mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + zlmresTfulUtils.restartServer(mediaServerItem); + }else { + logger.info("[媒体服务节点] 设置成功 {} -> {}:{}", + mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + } + }else { + logger.info("[媒体服务节点] 设置媒体服务节点失败 {} -> {}:{}", + mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java index 46785911..131b8a52 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java @@ -34,6 +34,7 @@ public interface MediaServerMapper { "record_day,"+ "record_path,"+ "default_server,"+ + "type,"+ "create_time,"+ "update_time,"+ "hook_alive_interval"+ @@ -60,6 +61,7 @@ public interface MediaServerMapper { "#{recordDay}, " + "#{recordPath}, " + "#{defaultServer}, " + + "#{type}, " + "#{createTime}, " + "#{updateTime}, " + "#{hookAliveInterval})") @@ -88,6 +90,7 @@ public interface MediaServerMapper { ", hook_alive_interval=#{hookAliveInterval}" + ", record_day=#{recordDay}" + ", record_path=#{recordPath}" + + ", type=#{type}" + "WHERE id=#{id}"+ " "}) int update(MediaServerItem mediaServerItem); @@ -113,6 +116,7 @@ public interface MediaServerMapper { ", record_assist_port=#{recordAssistPort}" + ", record_day=#{recordDay}" + ", record_path=#{recordPath}" + + ", type=#{type}" + ", hook_alive_interval=#{hookAliveInterval}" + "WHERE ip=#{ip} and http_port=#{httpPort}"+ " "}) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java index 926d8a67..bf9cf4fe 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java @@ -76,7 +76,7 @@ public class CloudRecordController { } mediaServerItems.add(mediaServerItem); } else { - mediaServerItems = mediaServerService.getAll(); + mediaServerItems = mediaServerService.getAllOnlineList(); } if (mediaServerItems.isEmpty()) { return new ArrayList<>(); @@ -119,7 +119,7 @@ public class CloudRecordController { } mediaServerItems.add(mediaServerItem); } else { - mediaServerItems = mediaServerService.getAll(); + mediaServerItems = mediaServerService.getAllOnlineList(); } if (mediaServerItems.isEmpty()) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "当前无流媒体"); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java index 65cb8a65..09f0abb9 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java @@ -108,8 +108,8 @@ public class ServerController { @Parameter(name = "secret", description = "流媒体服务secret", required = true) @GetMapping(value = "/media_server/check") @ResponseBody - public MediaServerItem checkMediaServer(@RequestParam String ip, @RequestParam int port, @RequestParam String secret) { - return mediaServerService.checkMediaServer(ip, port, secret); + public MediaServerItem checkMediaServer(@RequestParam String ip, @RequestParam int port, @RequestParam String secret, @RequestParam String type) { + return mediaServerService.checkMediaServer(ip, port, secret, type); } @Operation(summary = "测试流媒体录像管理服务", security = @SecurityRequirement(name = JwtUtils.HEADER)) @@ -129,7 +129,7 @@ public class ServerController { @PostMapping(value = "/media_server/save") @ResponseBody public void saveMediaServer(@RequestBody MediaServerItem mediaServerItem) { - MediaServerItem mediaServerItemInDatabase = mediaServerService.getOne(mediaServerItem.getId()); + MediaServerItem mediaServerItemInDatabase = mediaServerService.getOneFromDatabase(mediaServerItem.getId()); if (mediaServerItemInDatabase != null) { mediaServerService.update(mediaServerItem); @@ -143,11 +143,7 @@ public class ServerController { @DeleteMapping(value = "/media_server/delete") @ResponseBody public void deleteMediaServer(@RequestParam String id) { - if (mediaServerService.getOne(id) == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到此节点"); - } mediaServerService.delete(id); - mediaServerService.deleteDb(id); } diff --git a/web_src/src/components/dialog/MediaServerEdit.vue b/web_src/src/components/dialog/MediaServerEdit.vue index 9808a1c2..9d8491eb 100755 --- a/web_src/src/components/dialog/MediaServerEdit.vue +++ b/web_src/src/components/dialog/MediaServerEdit.vue @@ -20,6 +20,12 @@ + + + + + +
下一步 @@ -170,7 +176,7 @@ export default { hookIp: "", sdpIp: "", streamIp: "", - secret: "035c73f7-bb6b-4889-a715-d9eb2d1925cc", + secret: "", httpPort: "", httpSSlPort: "", recordAssistPort: "", @@ -182,6 +188,7 @@ export default { rtpProxyPort: "", rtspPort: "", rtspSSLPort: "", + type: "zlm", }, rtpPortRange1:30000, rtpPortRange2:30500, @@ -330,7 +337,7 @@ export default { hookIp: "", sdpIp: "", streamIp: "", - secret: "035c73f7-bb6b-4889-a715-d9eb2d1925cc", + secret: "", httpPort: "", httpSSlPort: "", recordAssistPort: "", diff --git a/web_src/src/components/service/MediaServer.js b/web_src/src/components/service/MediaServer.js index d4446f06..b0495373 100755 --- a/web_src/src/components/service/MediaServer.js +++ b/web_src/src/components/service/MediaServer.js @@ -45,7 +45,8 @@ class MediaServer{ params: { ip: param.ip, port: param.httpPort, - secret: param.secret + secret: param.secret, + type: param.type } }).then(function (res) { if (typeof (callback) == "function") callback(res.data) diff --git a/数据库/abl/更新-mysql-2.7.0.sql b/数据库/abl/更新-mysql-2.7.0.sql index 2b38f06d..de92cb2c 100644 --- a/数据库/abl/更新-mysql-2.7.0.sql +++ b/数据库/abl/更新-mysql-2.7.0.sql @@ -1,2 +1,2 @@ alter table wvp_media_server - add type character varying(50) default 'zlm',; + add type character varying(50) default 'zlm';