From d022eb72d648d21eb0121388b4360e44a53d8405 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 2 Jan 2025 16:16:50 +0800 Subject: [PATCH] =?UTF-8?q?[=E9=9B=86=E7=BE=A4-=E8=87=AA=E5=8A=A8=E5=88=87?= =?UTF-8?q?=E6=8D=A2=E8=BF=87=E5=9B=BD=E6=A0=87=E7=BA=A7=E8=81=94]=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=A3=80=E6=B5=8Bwvp=E5=A5=94=E6=BA=83?= =?UTF-8?q?=E4=B8=8E=E9=87=8D=E6=96=B0=E6=B3=A8=E5=86=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/common/ServerInfo.java | 7 ++- .../iot/vmp/common/VideoManagerConstants.java | 2 + .../com/genersoft/iot/vmp/conf/SipConfig.java | 4 ++ .../genersoft/iot/vmp/gb28181/SipLayer.java | 3 +- .../iot/vmp/gb28181/dao/PlatformMapper.java | 3 + .../service/impl/PlatformServiceImpl.java | 60 ++++++++++++++++++- .../iot/vmp/storager/IRedisCatchStorage.java | 4 ++ .../storager/impl/RedisCatchStorageImpl.java | 15 ++++- 8 files changed, 90 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java b/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java index f03f7190..fb1941a1 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.common; +import com.genersoft.iot.vmp.utils.DateUtil; import lombok.Data; @Data @@ -10,13 +11,13 @@ public class ServerInfo { /** * 现在使用的线程数 */ - private int threadNumber; + private String createTime; - public static ServerInfo create(String ip, int port, int threadNumber) { + public static ServerInfo create(String ip, int port) { ServerInfo serverInfo = new ServerInfo(); serverInfo.setIp(ip); serverInfo.setPort(port); - serverInfo.setThreadNumber(threadNumber); + serverInfo.setCreateTime(DateUtil.getNow()); return serverInfo; } } diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index c89653b1..cee1993f 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -10,6 +10,8 @@ public class VideoManagerConstants { public static final String WVP_SERVER_PREFIX = "VMP_SIGNALLING_SERVER_INFO_"; + public static final String WVP_SERVER_LIST = "VMP_SERVER_LIST"; + public static final String WVP_SERVER_STREAM_PREFIX = "VMP_SIGNALLING_STREAM_"; public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_INFO:"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java index 4742daa5..68fc5ab2 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java @@ -6,6 +6,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import java.util.List; + @Component @ConfigurationProperties(prefix = "sip", ignoreInvalidFields = true) @Order(0) @@ -16,6 +18,8 @@ public class SipConfig { private String showIp; + private List monitorIps; + private Integer port; private String domain; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index cbbb2959..78351bc2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -80,11 +80,12 @@ public class SipLayer implements CommandLineRunner { monitorIps.add(sipConfig.getIp()); } } + sipConfig.setMonitorIps(monitorIps); if (ObjectUtils.isEmpty(sipConfig.getShowIp())){ sipConfig.setShowIp(String.join(",", monitorIps)); } SipFactory.getInstance().setPathName("gov.nist"); - if (monitorIps.size() > 0) { + if (!monitorIps.isEmpty()) { for (String monitorIp : monitorIps) { addListeningPoint(monitorIp, sipConfig.getPort()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java index d65075ee..5206f336 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java @@ -94,4 +94,7 @@ public interface PlatformMapper { @Select("SELECT server_id FROM wvp_platform WHERE enable=true and server_id != #{serverId} group by server_id") List queryServerIdsWithEnable(@Param("serverId") String serverId); + + @Select("SELECT * FROM wvp_platform WHERE server_id == #{serverId}") + List queryByServerId(@Param("serverId") String serverId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index ef05f3ee..795ae13e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -67,6 +68,7 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired private SSRCFactory ssrcFactory; @@ -85,6 +87,9 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private UserSetting userSetting; + @Autowired + private SipConfig sipConfig; + @Autowired private SipInviteSessionManager sessionManager; @@ -114,12 +119,61 @@ public class PlatformServiceImpl implements IPlatformService { serverIds.forEach(serverId -> { // 检查每个是否存活 ServerInfo serverInfo = redisCatchStorage.queryServerInfo(serverId); - if (serverInfo == null) { - // 此平台需要选择新平台处理 - + if (serverInfo == null && userSetting.getServerId().equals(redisCatchStorage.chooseOneServer())) { + // 此平台需要选择新平台处理, 确定由当前平台即开始处理 + List platformList = platformMapper.queryByServerId(serverId); + platformList.forEach(platform -> { + // 设置平台使用当前平台的IP + platform.setAddress(getIpWithSameNetwork(platform.getAddress())); + platformMapper.update(platform); + // 更新redis + redisCatchStorage.delPlatformCatchInfo(platform.getServerGBId()); + PlatformCatch platformCatch = new PlatformCatch(); + platformCatch.setPlatform(platform); + platformCatch.setId(platform.getServerGBId()); + redisCatchStorage.updatePlatformCatchInfo(platformCatch); + // 开始注册 + // 注册成功时由程序直接调用了online方法 + try { + commanderForPlatform.register(platform, eventResult -> { + log.info("[国标级联] {}({}),添加向上级注册失败,请确定上级平台可用时重新保存", platform.getName(), platform.getServerGBId()); + }, null); + } catch (InvalidArgumentException | ParseException | SipException e) { + log.error("[命令发送失败] 国标级联: {}", e.getMessage()); + } + }); } }); + } + /** + * 获取同网段的IP + */ + private String getIpWithSameNetwork(String ip){ + if (ip == null || sipConfig.getMonitorIps().size() == 1) { + return sipConfig.getMonitorIps().get(0); + } + String[] ipSplit = ip.split("\\."); + String ip1 = null, ip2 = null, ip3 = null; + for (String monitorIp : sipConfig.getMonitorIps()) { + String[] monitorIpSplit = monitorIp.split("\\."); + if (monitorIpSplit[0].equals(ipSplit[0]) && monitorIpSplit[1].equals(ipSplit[1]) && monitorIpSplit[2].equals(ipSplit[2])) { + ip3 = monitorIp; + }else if (monitorIpSplit[0].equals(ipSplit[0]) && monitorIpSplit[1].equals(ipSplit[1])) { + ip2 = monitorIp; + }else if (monitorIpSplit[0].equals(ipSplit[0])) { + ip1 = monitorIp; + } + } + if (ip3 != null) { + return ip3; + }else if (ip2 != null) { + return ip2; + }else if (ip1 != null) { + return ip1; + }else { + return sipConfig.getMonitorIps().get(0); + } } /** diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 78f26a18..66415fc9 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -190,4 +190,8 @@ public interface IRedisCatchStorage { void sendPushStreamOnline(SendRtpInfo sendRtpItem); ServerInfo queryServerInfo(String serverId); + + String chooseOneServer(); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index ec60df6d..5e6ab306 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -114,7 +114,10 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String key = VideoManagerConstants.WVP_SERVER_PREFIX + userSetting.getServerId(); Duration duration = Duration.ofSeconds(time); redisTemplate.opsForValue().set(key, serverInfo, duration); - // + // 设置平台的分数值 + String setKey = VideoManagerConstants.WVP_SERVER_LIST; + // 首次设置就设置为0, 后续值越小说明越是最近启动的 + redisTemplate.opsForZSet().add(setKey, userSetting.getServerId(), System.currentTimeMillis()); } @Override @@ -540,4 +543,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String key = VideoManagerConstants.WVP_SERVER_PREFIX + serverId; return (ServerInfo)redisTemplate.opsForValue().get(key); } + + @Override + public String chooseOneServer() { + String key = VideoManagerConstants.WVP_SERVER_LIST; + Set range = redisTemplate.opsForZSet().range(key, 0, 0); + if (range == null || range.isEmpty()) { + return null; + } + return (String) range.iterator().next(); + } }