From dd6beba843ca6d5c924b789ba9cc2413f549e456 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 30 Oct 2024 14:21:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Catalog=E6=8E=A5=E6=94=B6,?= =?UTF-8?q?=E6=8F=90=E5=8D=87=E5=A4=A7=E9=87=8F=E8=AE=BE=E5=A4=87=E5=90=8C?= =?UTF-8?q?=E6=97=B6=E6=8E=A5=E5=85=A5=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/DeviceChannelServiceImpl.java | 2 +- .../service/impl/DeviceServiceImpl.java | 4 +- .../gb28181/session/CatalogDataManager.java | 92 +++++++++++-------- .../cmd/CatalogResponseMessageHandler.java | 30 +++--- 4 files changed, 72 insertions(+), 56 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index b86d25e0a..d68008ee4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -37,7 +37,6 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; /** * @author lin @@ -470,6 +469,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { if (CollectionUtils.isEmpty(deviceChannelList)) { return false; } + System.out.println("size: " + deviceChannelList.size()); List allChannels = channelMapper.queryAllChannelsForRefresh(deviceDbId); Map allChannelMap = new HashMap<>(); if (!allChannels.isEmpty()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 742bf1e49..a6ae07eaf 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -335,12 +335,12 @@ public class DeviceServiceImpl implements IDeviceService { try { sipCommander.catalogQuery(device, sn, event -> { String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); - catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg); + catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), sn, errorMsg); }); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[同步通道], 信令发送失败:{}", e.getMessage() ); String errorMsg = String.format("同步通道失败,信令发送失败: %s", e.getMessage()); - catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg); + catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), sn, errorMsg); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java index d0a7dc12c..d53ffaeea 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java @@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.session; -import com.genersoft.iot.vmp.common.InviteInfo; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IGroupService; @@ -9,16 +7,16 @@ import com.genersoft.iot.vmp.gb28181.service.IRegionService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; -import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.ScanOptions; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; @Slf4j @@ -41,8 +39,12 @@ public class CatalogDataManager implements CommandLineRunner { private final String key = "VMP_CATALOG_DATA"; + public String buildMapKey(String deviceId, int sn ) { + return deviceId + "_" + sn; + } + public void addReady(Device device, int sn ) { - CatalogData catalogData = dataMap.get(device.getDeviceId()); + CatalogData catalogData = dataMap.get(buildMapKey(device.getDeviceId(),sn)); if (catalogData != null) { Set redisKeysForChannel = catalogData.getRedisKeysForChannel(); if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) { @@ -62,19 +64,19 @@ public class CatalogDataManager implements CommandLineRunner { redisTemplate.opsForHash().delete(key, deleteKey); } } - dataMap.remove(device.getDeviceId()); + dataMap.remove(buildMapKey(device.getDeviceId(),sn)); } catalogData = new CatalogData(); catalogData.setDevice(device); catalogData.setSn(sn); catalogData.setStatus(CatalogData.CatalogDataStatus.ready); catalogData.setTime(Instant.now()); - dataMap.put(device.getDeviceId(), catalogData); + dataMap.put(buildMapKey(device.getDeviceId(),sn), catalogData); } public void put(String deviceId, int sn, int total, Device device, List deviceChannelList, List regionList, List groupList) { - CatalogData catalogData = dataMap.get(device.getDeviceId()); + CatalogData catalogData = dataMap.get(buildMapKey(device.getDeviceId(),sn)); if (catalogData == null ) { log.warn("[缓存-Catalog] 未找到缓存对象,可能已经结束"); return; @@ -108,9 +110,9 @@ public class CatalogDataManager implements CommandLineRunner { } } - public List getDeviceChannelList(String deviceId) { + public List getDeviceChannelList(String deviceId, int sn) { List result = new ArrayList<>(); - CatalogData catalogData = dataMap.get(deviceId); + CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); if (catalogData == null ) { log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束"); return result; @@ -124,9 +126,9 @@ public class CatalogDataManager implements CommandLineRunner { return result; } - public List getRegionList(String deviceId) { + public List getRegionList(String deviceId, int sn) { List result = new ArrayList<>(); - CatalogData catalogData = dataMap.get(deviceId); + CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); if (catalogData == null ) { log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束"); return result; @@ -140,9 +142,9 @@ public class CatalogDataManager implements CommandLineRunner { return result; } - public List getGroupList(String deviceId) { + public List getGroupList(String deviceId, int sn) { List result = new ArrayList<>(); - CatalogData catalogData = dataMap.get(deviceId); + CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); if (catalogData == null ) { log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束"); return result; @@ -157,28 +159,40 @@ public class CatalogDataManager implements CommandLineRunner { } public SyncStatus getSyncStatus(String deviceId) { - CatalogData catalogData = dataMap.get(deviceId); - if (catalogData == null) { + if (dataMap.isEmpty()) { return null; } - SyncStatus syncStatus = new SyncStatus(); - syncStatus.setCurrent(catalogData.getRedisKeysForChannel().size()); - syncStatus.setTotal(catalogData.getTotal()); - syncStatus.setErrorMsg(catalogData.getErrorMsg()); - if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { - syncStatus.setSyncIng(false); - }else { - syncStatus.setSyncIng(true); + Set keySet = dataMap.keySet(); + for (String key : keySet) { + CatalogData catalogData = dataMap.get(key); + if (catalogData != null && deviceId.equals(catalogData.getDevice().getDeviceId())) { + SyncStatus syncStatus = new SyncStatus(); + syncStatus.setCurrent(catalogData.getRedisKeysForChannel().size()); + syncStatus.setTotal(catalogData.getTotal()); + syncStatus.setErrorMsg(catalogData.getErrorMsg()); + if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { + syncStatus.setSyncIng(false); + }else { + syncStatus.setSyncIng(true); + } + return syncStatus; + } } - return syncStatus; + return null; } public boolean isSyncRunning(String deviceId) { - CatalogData catalogData = dataMap.get(deviceId); - if (catalogData == null) { + if (dataMap.isEmpty()) { return false; } - return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end); + Set keySet = dataMap.keySet(); + for (String key : keySet) { + CatalogData catalogData = dataMap.get(key); + if (catalogData != null && deviceId.equals(catalogData.getDevice().getDeviceId())) { + return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end); + } + } + return false; } @Override @@ -202,17 +216,17 @@ public class CatalogDataManager implements CommandLineRunner { if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { String deviceId = catalogData.getDevice().getDeviceId(); int sn = catalogData.getSn(); - List deviceChannelList = getDeviceChannelList(deviceId); + List deviceChannelList = getDeviceChannelList(deviceId, sn); if (catalogData.getTotal() == deviceChannelList.size()) { deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList); }else { deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList); } - List regionList = getRegionList(deviceId); + List regionList = getRegionList(deviceId, sn); if ( regionList!= null && !regionList.isEmpty()) { regionService.batchAdd(regionList); } - List groupList = getGroupList(deviceId); + List groupList = getGroupList(deviceId, sn); if (groupList != null && !groupList.isEmpty()) { groupService.batchAdd(groupList); } @@ -248,8 +262,8 @@ public class CatalogDataManager implements CommandLineRunner { } - public void setChannelSyncEnd(String deviceId, String errorMsg) { - CatalogData catalogData = dataMap.get(deviceId); + public void setChannelSyncEnd(String deviceId, int sn, String errorMsg) { + CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); if (catalogData == null) { return; } @@ -258,16 +272,16 @@ public class CatalogDataManager implements CommandLineRunner { catalogData.setTime(Instant.now()); } - public int size(String deviceId) { - CatalogData catalogData = dataMap.get(deviceId); + public int size(String deviceId, int sn) { + CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); if (catalogData == null) { return 0; } return catalogData.getRedisKeysForChannel().size(); } - public int sumNum(String deviceId) { - CatalogData catalogData = dataMap.get(deviceId); + public int sumNum(String deviceId, int sn) { + CatalogData catalogData = dataMap.get(buildMapKey(deviceId,sn)); if (catalogData == null) { return 0; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 26d087789..0dc38a4ac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -96,6 +96,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp continue; } RequestEvent evt = take.getEvt(); + int sn = 0; // 全局异常捕获,保证下一条可以得到处理 try { Element rootElement = null; @@ -118,7 +119,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); // 数据已经完整接收 deviceChannelService.cleanChannelsForDevice(take.getDevice().getId()); - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, null); } else { Iterator deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { @@ -132,6 +133,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp if (channelDeviceElement == null) { continue; } + // 从xml解析内容到 DeviceChannel 对象 DeviceChannel channel = DeviceChannel.decode(itemDevice); if (channel.getDeviceId() == null) { log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); @@ -158,25 +160,25 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } channelList.add(channel); } - int sn = Integer.parseInt(snElement.getText()); + sn = Integer.parseInt(snElement.getText()); catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList, regionList, groupList); - log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.size(take.getDevice().getDeviceId()), sumNum); + log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.size(take.getDevice().getDeviceId(), sn), sumNum); } } } catch (Exception e) { log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); log.error("[收到通道] 异常内容: ", e); } finally { - if (catalogDataCatch.size(take.getDevice().getDeviceId()) == catalogDataCatch.sumNum(take.getDevice().getDeviceId())) { + if (catalogDataCatch.size(take.getDevice().getDeviceId(), sn) == catalogDataCatch.sumNum(take.getDevice().getDeviceId(), sn)) { // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, // 目前支持设备通道上线通知时和设备上线时向上级通知 - boolean resetChannelsResult = saveData(take.getDevice()); + boolean resetChannelsResult = saveData(take.getDevice(), sn); if (!resetChannelsResult) { - String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(take.getDevice().getDeviceId()) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() + "条"; - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); + String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(take.getDevice().getDeviceId(), sn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId(), sn).size() + "条"; + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, errorMsg); } else { - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, null); } } } @@ -184,20 +186,20 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } @Transactional - public boolean saveData(Device device) { + public boolean saveData(Device device, int sn) { boolean result = true; - List deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId()); + List deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId(), sn); if (deviceChannelList != null && !deviceChannelList.isEmpty()) { result &= deviceChannelService.resetChannels(device.getId(), deviceChannelList); } - List regionList = catalogDataCatch.getRegionList(device.getDeviceId()); + List regionList = catalogDataCatch.getRegionList(device.getDeviceId(), sn); if ( regionList!= null && !regionList.isEmpty()) { result &= regionService.batchAdd(regionList); } - List groupList = catalogDataCatch.getGroupList(device.getDeviceId()); + List groupList = catalogDataCatch.getGroupList(device.getDeviceId(), sn); if (groupList != null && !groupList.isEmpty()) { result &= groupService.batchAdd(groupList); } @@ -221,7 +223,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp catalogDataCatch.addReady(device, sn); } - public void setChannelSyncEnd(String deviceId, String errorMsg) { - catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg); + public void setChannelSyncEnd(String deviceId, int sn, String errorMsg) { + catalogDataCatch.setChannelSyncEnd(deviceId, sn, errorMsg); } }