diff --git a/pom.xml b/pom.xml index b316e980..7ec73b81 100644 --- a/pom.xml +++ b/pom.xml @@ -382,6 +382,44 @@ true + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + **/all-application.yml + **/application.yml + **/application-*.yml + **/local.jks + + + + + maven-resources-plugin + + + copy-resources + package + + copy-resources + + + + + src/main/resources + + application.yml + application-*.yml + + + + ${project.build.directory} + + + + diff --git a/src/main/java/com/genersoft/iot/vmp/conf/CivilCodeFileConf.java b/src/main/java/com/genersoft/iot/vmp/conf/CivilCodeFileConf.java index 20b6eef7..8449ebb1 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/CivilCodeFileConf.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/CivilCodeFileConf.java @@ -1,7 +1,7 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.common.CivilCodePo; -import org.ehcache.impl.internal.concurrent.ConcurrentHashMap; +import com.genersoft.iot.vmp.utils.CivilCodeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -17,7 +17,8 @@ import java.io.File; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.file.Files; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; /** * 启动时读取行政区划表 @@ -28,8 +29,6 @@ public class CivilCodeFileConf implements CommandLineRunner { private final static Logger logger = LoggerFactory.getLogger(CivilCodeFileConf.class); - private final Map civilCodeMap= new ConcurrentHashMap<>(); - @Autowired @Lazy private UserSetting userSetting; @@ -62,6 +61,7 @@ public class CivilCodeFileConf implements CommandLineRunner { BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream)); int index = -1; String line; + List civilCodePoList = new ArrayList<>(); while ((line = inputStreamReader.readLine()) != null) { index ++; if (index == 0) { @@ -69,36 +69,15 @@ public class CivilCodeFileConf implements CommandLineRunner { } String[] infoArray = line.split(","); CivilCodePo civilCodePo = CivilCodePo.getInstance(infoArray); - civilCodeMap.put(civilCodePo.getCode(), civilCodePo); + civilCodePoList.add(civilCodePo); } + CivilCodeUtil.INSTANCE.add(civilCodePoList); inputStreamReader.close(); inputStream.close(); - if (civilCodeMap.size() == 0) { + if (civilCodePoList.isEmpty()) { logger.warn("[行政区划] 文件内容为空,可能造成目录刷新结果不完整"); }else { - logger.info("[行政区划] 加载成功,共加载数据{}条", civilCodeMap.size()); + logger.info("[行政区划] 加载成功,共加载数据{}条", civilCodePoList.size()); } } - - public CivilCodePo getParentCode(String code) { - if (code.length() > 8) { - return null; - } - if (code.length() == 8) { - String parentCode = code.substring(0, 6); - return civilCodeMap.get(parentCode); - }else { - CivilCodePo civilCodePo = civilCodeMap.get(code); - if (civilCodePo == null){ - return null; - } - String parentCode = civilCodePo.getParentCode(); - if (parentCode == null) { - return null; - } - return civilCodeMap.get(parentCode); - } - - } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 80844353..3205498f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -498,6 +498,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String endTimeStr = DateUtil.urlFormatter.format(end); String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false, false, device.getStreamModeForParam()); + sendRtpItem.setStream(stream); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), @@ -1006,7 +1007,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("8")) { +// if (mediaFormats.contains("8")) { port = media.getMediaPort(); String protocol = media.getProtocol(); // 区分TCP发流还是udp, 当前默认udp @@ -1022,7 +1023,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } break; - } +// } } if (port == -1) { logger.info("不支持的媒体格式,返回415"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 4ea7667d..cd97786d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -108,7 +108,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent }else { event = eventElement.getText().toUpperCase(); } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf); + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); if (channel == null) { logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); continue; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index 6ab75d20..c910451d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -1,17 +1,14 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IPlayService; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.Element; @@ -77,15 +74,6 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); audioBroadcastManager.update(audioBroadcastCatch); - // 等待invite消息, 超时则结束 - String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId(); - if (!SipUtils.isFrontEnd(device.getDeviceId())) { - key += audioBroadcastCatch.getChannelId(); - } - dynamicTask.startDelay(key, ()->{ - logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId); - playService.stopAudioBroadcast(device.getDeviceId(), channelId); - }, 2000); }else { playService.stopAudioBroadcast(device.getDeviceId(), channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 19dde718..c16d7f56 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch; @@ -57,9 +56,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Autowired private ThreadPoolTaskExecutor taskExecutor; - @Autowired - private CivilCodeFileConf civilCodeFileConf; - @Autowired private SipConfig sipConfig; private AtomicBoolean processing = new AtomicBoolean(false); @@ -118,7 +114,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp if (channelDeviceElement == null) { continue; } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, null, civilCodeFileConf); + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, null); if (channel == null) { logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); continue; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java index 41e05707..9de1ef2b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java @@ -3,10 +3,10 @@ package com.genersoft.iot.vmp.gb28181.utils; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CivilCodePo; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.utils.CivilCodeUtil; import com.genersoft.iot.vmp.utils.DateUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; @@ -240,7 +240,7 @@ public class XmlUtil { CivilCode, BusinessGroup,VirtualOrganization,Other } - public static DeviceChannel channelContentHandler(Element itemDevice, Device device, String event, CivilCodeFileConf civilCodeFileConf){ + public static DeviceChannel channelContentHandler(Element itemDevice, Device device, String event){ DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); Element channdelIdElement = itemDevice.element("DeviceID"); @@ -267,7 +267,7 @@ public class XmlUtil { } if(channelId.length() <= 8) { deviceChannel.setHasAudio(false); - CivilCodePo parentCode = civilCodeFileConf.getParentCode(channelId); + CivilCodePo parentCode = CivilCodeUtil.INSTANCE.getParentCode(channelId); if (parentCode != null) { deviceChannel.setParentId(parentCode.getCode()); deviceChannel.setCivilCode(parentCode.getCode()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java index 43f1a8a6..e4090977 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.github.pagehelper.PageInfo; import java.util.List; +import java.util.Map; /** * 级联国标平台关联流业务接口 @@ -71,4 +72,7 @@ public interface IGbStreamService { void delAllPlatformInfo(String platformId, String catalogId); List getGbChannelWithGbid(String gbId); + + Map getAllGBId(); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 333b7b31..10b1eff1 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -115,4 +115,7 @@ public interface IStreamPushService { */ ResourceBaseInfo getOverview(); + Map getAllAppAndStreamMap(); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 3da28450..1dc7db44 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -575,8 +575,8 @@ public class DeviceServiceImpl implements IDeviceService { }else if (device.getSubscribeCycleForMobilePosition() == 0) { // 取消订阅 - deviceInStore.setSubscribeCycleForCatalog(0); - removeCatalogSubscribe(deviceInStore, null); + deviceInStore.setSubscribeCycleForMobilePosition(0); + removeMobilePositionSubscribe(deviceInStore, null); } } if (deviceInStore.getGeoCoordSys() != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java index 9fcbb407..c2c9d725 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java @@ -19,11 +19,11 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.List; +import java.util.Map; @Service @DS("master") @@ -268,4 +268,9 @@ public class GbStreamServiceImpl implements IGbStreamService { public List getGbChannelWithGbid(String gbId) { return gbStreamMapper.selectByGBId(gbId); } + + @Override + public Map getAllGBId() { + return gbStreamMapper.getAllGBId(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index fc9318b3..0df0d4d0 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -3,10 +3,7 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.common.InviteInfo; -import com.genersoft.iot.vmp.common.InviteSessionStatus; -import com.genersoft.iot.vmp.common.InviteSessionType; -import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; @@ -18,19 +15,13 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.*; -import com.genersoft.iot.vmp.media.zlm.*; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; @@ -40,15 +31,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; -import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; @@ -1179,6 +1162,15 @@ public class PlayServiceImpl implements IPlayService { // 发送成功 AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, mediaServerItem, app, stream, event, AudioBroadcastCatchStatus.Ready, isFromPlatform); audioBroadcastManager.update(audioBroadcastCatch); + // 等待invite消息, 超时则结束 + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId(); + if (!SipUtils.isFrontEnd(device.getDeviceId())) { + key += audioBroadcastCatch.getChannelId(); + } + dynamicTask.startDelay(key, ()->{ + logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId); + stopAudioBroadcast(device.getDeviceId(), channelId); + }, 2000); }, eventResultForError -> { // 发送失败 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 13c452c1..e2d7e68c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -548,4 +548,9 @@ public class StreamPushServiceImpl implements IStreamPushService { return new ResourceBaseInfo(total, online); } + + @Override + public Map getAllAppAndStreamMap() { + return streamPushMapper.getAllAppAndStreamMap(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java index cb34ff59..65239c8f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -19,6 +20,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -57,7 +59,8 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { try { List streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); //查询全部的app+stream 用于判断是添加还是修改 - List allAppAndStream = streamPushService.getAllAppAndStream(); + Map allAppAndStream = streamPushService.getAllAppAndStreamMap(); + Map allGBId = gbStreamService.getAllGBId(); /** * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 @@ -67,9 +70,15 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { for (StreamPushItem streamPushItem : streamPushItems) { String app = streamPushItem.getApp(); String stream = streamPushItem.getStream(); - boolean contains = allAppAndStream.contains(app + stream); + boolean contains = allAppAndStream.containsKey(app + stream); //不存在就添加 if (!contains) { + if (allGBId.containsKey(streamPushItem.getGbId())) { + GbStream gbStream = allGBId.get(streamPushItem.getGbId()); + logger.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}", + streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream()); + continue; + } streamPushItem.setStreamType("push"); streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); @@ -77,25 +86,31 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { streamPushItem.setOriginTypeStr("rtsp_push"); streamPushItem.setTotalReaderCount("0"); streamPushItemForSave.add(streamPushItem); + allGBId.put(streamPushItem.getGbId(), streamPushItem); } else { + if (allGBId.containsKey(streamPushItem.getGbId())) { + GbStream gbStream = allGBId.get(streamPushItem.getGbId()); + logger.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}", + streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream()); + continue; + } //存在就只修改 name和gbId streamPushItemForUpdate.add(streamPushItem); } } - if (streamPushItemForSave.size() > 0) { - + if (!streamPushItemForSave.isEmpty()) { logger.info("添加{}条",streamPushItemForSave.size()); logger.info(JSONObject.toJSONString(streamPushItemForSave)); streamPushService.batchAdd(streamPushItemForSave); } - if(streamPushItemForUpdate.size()>0){ + if(!streamPushItemForUpdate.isEmpty()){ logger.info("修改{}条",streamPushItemForUpdate.size()); logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); gbStreamService.updateGbIdOrName(streamPushItemForUpdate); } }catch (Exception e) { - logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); + logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody())); logger.error("[REDIS消息-推流设备列表更新] 异常内容: ", e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 6591e3f9..3790bdab 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -10,6 +10,7 @@ import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; @Mapper @Repository @@ -170,4 +171,7 @@ public interface GbStreamMapper { @Select("SELECT status FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}") Boolean selectStatusForPush(@Param("app") String app, @Param("stream") String stream); + @MapKey("gbId") + @Select("SELECT * from wvp_gb_stream") + Map getAllGBId(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index 682f07c6..daf21eff 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -7,6 +7,7 @@ import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; @Mapper @Repository @@ -195,4 +196,12 @@ public interface StreamPushMapper { "" + "") List getListIn(List streamPushItems); + + @MapKey("vhost") + @Select("SELECT CONCAT(wsp.app, wsp.stream) as vhost, wsp.app, wsp.stream, wgs.gb_id, wgs.name " + + " from wvp_stream_push wsp " + + " left join wvp_gb_stream wgs on wgs.app = wsp.app and wgs.stream = wsp.stream") + Map getAllAppAndStreamMap(); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/utils/CivilCodeUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/CivilCodeUtil.java new file mode 100644 index 00000000..ba23ab2e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/utils/CivilCodeUtil.java @@ -0,0 +1,50 @@ +package com.genersoft.iot.vmp.utils; + +import com.genersoft.iot.vmp.common.CivilCodePo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public enum CivilCodeUtil { + + INSTANCE; + private final static Logger log = LoggerFactory.getLogger(CivilCodeUtil.class); + + // 用与消息的缓存 + private final Map civilCodeMap = new ConcurrentHashMap<>(); + + CivilCodeUtil() { + } + + public void add(List civilCodePoList) { + if (!civilCodePoList.isEmpty()) { + for (CivilCodePo civilCodePo : civilCodePoList) { + civilCodeMap.put(civilCodePo.getCode(), civilCodePo); + } + } + } + + public CivilCodePo getParentCode(String code) { + if (code.length() > 8) { + return null; + } + if (code.length() == 8) { + String parentCode = code.substring(0, 6); + return civilCodeMap.get(parentCode); + }else { + CivilCodePo civilCodePo = civilCodeMap.get(code); + if (civilCodePo == null){ + return null; + } + String parentCode = civilCodePo.getParentCode(); + if (parentCode == null) { + return null; + } + return civilCodeMap.get(parentCode); + } + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index f8ba4f2b..2bb9bfb1 100755 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -35,6 +35,11 @@ public class DateUtil { */ private static final String ISO8601_ZONE_PATTERN = "yyyy-MM-dd'T'HH:mm:ssXXX"; + /** + * 兼容的时间格式 iso8601时间格式带毫秒 + */ + private static final String ISO8601_MILLISECOND_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS"; + /** * wvp内部统一时间格式 */ @@ -55,6 +60,8 @@ public class DateUtil { public static final DateTimeFormatter formatterCompatibleISO8601 = DateTimeFormatter.ofPattern(ISO8601_COMPATIBLE_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter formatterISO8601 = DateTimeFormatter.ofPattern(ISO8601_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter formatterZoneISO8601 = DateTimeFormatter.ofPattern(ISO8601_ZONE_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); + public static final DateTimeFormatter formatterMillisecondISO8601 = DateTimeFormatter.ofPattern(ISO8601_MILLISECOND_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); + public static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter DateFormatter = DateTimeFormatter.ofPattern(date_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter urlFormatter = DateTimeFormatter.ofPattern(URL_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); @@ -70,6 +77,8 @@ public class DateUtil { return formatter.format(formatterCompatibleISO8601.parse(formatTime)); } else if (verification(formatTime, formatterZoneISO8601)) { return formatter.format(formatterZoneISO8601.parse(formatTime)); + } else if (verification(formatTime, formatterMillisecondISO8601)) { + return formatter.format(formatterMillisecondISO8601.parse(formatTime)); } return formatter.format(formatterISO8601.parse(formatTime)); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java index d4791801..e65a5795 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java @@ -3,10 +3,9 @@ package com.genersoft.iot.vmp.vmanager.gb28181.gbStream; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IPlatformService; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.gb28181.gbStream.bean.GbStreamParam; import com.github.pagehelper.PageInfo; @@ -20,7 +19,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; -import java.util.ArrayList; import java.util.List; @Tag(name = "视频流关联到级联平台") @@ -34,6 +32,9 @@ public class GbStreamController { @Autowired private IGbStreamService gbStreamService; + @Autowired + private IStreamPushService service; + @Autowired private IPlatformService platformService;