diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 89ecb186..8a77f09f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -91,7 +91,7 @@ public class CatalogEventLister implements ApplicationListener { if (event.getDeviceChannels() != null) { deviceChannelList.addAll(event.getDeviceChannels()); } - if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ + if (event.getGbStreams() != null && !event.getGbStreams().isEmpty()){ for (GbStream gbStream : event.getGbStreams()) { if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) { continue; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java index bd966b69..c195cd84 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CatalogData; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; +import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -20,7 +21,7 @@ public class CatalogDataCatch { public static Map data = new ConcurrentHashMap<>(); @Autowired - private IVideoManagerStorage storager; + private IDeviceChannelService deviceChannelService; public void addReady(Device device, int sn ) { CatalogData catalogData = data.get(device.getDeviceId()); @@ -112,7 +113,7 @@ public class CatalogDataCatch { if ( catalogData.getLastTime().isBefore(instantBefore5S)) { // 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { - storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); + deviceChannelService.resetChannels(catalogData.getDevice(), catalogData.getChannelList()); String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; catalogData.setErrorMsg(errorMsg); }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index a78404e2..2d5fcd70 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.ICommonGbChannelService; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.dom4j.DocumentException; @@ -58,6 +59,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private ICommonGbChannelService commonGbChannelService; + @Autowired private DynamicTask dynamicTask; @@ -123,7 +127,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); updateChannelOnlineList.add(channel); if (updateChannelOnlineList.size() > 300) { - executeSaveForOnline(); + executeSaveForOnline(device); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -139,7 +143,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); + executeSaveForOffline(device); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -155,7 +159,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); + executeSaveForOffline(device); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -171,7 +175,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent }else { updateChannelOfflineList.add(channel); if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); + executeSaveForOffline(device); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -188,7 +192,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setId(deviceChannel.getId()); updateChannelMap.put(channel.getChannelId(), channel); if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); + executeSaveForUpdate(device); } }else { addChannelMap.put(channel.getChannelId(), channel); @@ -198,7 +202,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); + executeSaveForAdd(device); } } @@ -212,7 +216,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); } if (deleteChannelList.size() > 300) { - executeSaveForDelete(); + executeSaveForDelete(device); } break; case CatalogEvent.UPDATE: @@ -224,12 +228,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setId(deviceChannelForUpdate.getId()); updateChannelMap.put(channel.getChannelId(), channel); if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); + executeSaveForUpdate(device); } }else { addChannelMap.put(channel.getChannelId(), channel); if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); + executeSaveForAdd(device); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -251,7 +255,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent || deleteChannelList.size() > 0) { if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + dynamicTask.startDelay(talkKey, ()-> executeSave(device), 1000); } } } @@ -261,49 +265,64 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } } - private void executeSave(){ - executeSaveForAdd(); - executeSaveForUpdate(); - executeSaveForDelete(); - executeSaveForOnline(); - executeSaveForOffline(); + private void executeSave(Device device){ + executeSaveForAdd(device); + executeSaveForUpdate(device); + executeSaveForDelete(device); + executeSaveForOnline(device); + executeSaveForOffline(device); dynamicTask.stop(talkKey); } - private void executeSaveForUpdate(){ - if (updateChannelMap.values().size() > 0) { + private void executeSaveForUpdate(Device device){ + if (!updateChannelMap.values().isEmpty()) { ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); updateChannelMap.clear(); deviceChannelService.batchUpdateChannel(deviceChannels); + if (device.isAutoSyncChannel()) { + commonGbChannelService.updateChannelFromGb28181DeviceInList(device, deviceChannels); + } } } - private void executeSaveForAdd(){ - if (addChannelMap.values().size() > 0) { + private void executeSaveForAdd(Device device){ + if (!addChannelMap.values().isEmpty()) { ArrayList deviceChannels = new ArrayList<>(addChannelMap.values()); addChannelMap.clear(); deviceChannelService.batchAddChannel(deviceChannels); + if (device.isAutoSyncChannel()) { + commonGbChannelService.addChannelFromGb28181DeviceInList(device, deviceChannels); + } } } - private void executeSaveForDelete(){ - if (deleteChannelList.size() > 0) { + private void executeSaveForDelete(Device device){ + if (!deleteChannelList.isEmpty()) { deviceChannelService.deleteChannels(deleteChannelList); + if (device.isAutoSyncChannel()) { + commonGbChannelService.deleteGbChannelsFromList(deleteChannelList); + } deleteChannelList.clear(); } } - private void executeSaveForOnline(){ - if (updateChannelOnlineList.size() > 0) { + private void executeSaveForOnline(Device device){ + if (!updateChannelOnlineList.isEmpty()) { deviceChannelService.channelsOnline(updateChannelOnlineList); + if (device.isAutoSyncChannel()) { + commonGbChannelService.channelsOnlineFromList(deleteChannelList); + } updateChannelOnlineList.clear(); } } - private void executeSaveForOffline(){ - if (updateChannelOfflineList.size() > 0) { + private void executeSaveForOffline(Device device){ + if (!updateChannelOfflineList.isEmpty()) { deviceChannelService.channelsOffline(updateChannelOfflineList); + if (device.isAutoSyncChannel()) { + commonGbChannelService.channelsOfflineFromList(deleteChannelList); + } updateChannelOfflineList.clear(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 5c977a50..691e107e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; @@ -49,6 +50,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Autowired private IVideoManagerStorage storager; + @Autowired + private IDeviceChannelService deviceChannelService; + @Autowired private CatalogDataCatch catalogDataCatch; @@ -136,7 +140,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, // 目前支持设备通道上线通知时和设备上线时向上级通知 - boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); + boolean resetChannelsResult = deviceChannelService.resetChannels(take.getDevice(), catalogDataCatch.get(take.getDevice().getDeviceId())); if (!resetChannelsResult) { String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java index d4195462..a0291b60 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java @@ -1,8 +1,10 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.common.CommonGbChannel; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import java.util.ArrayList; import java.util.List; public interface ICommonGbChannelService { @@ -25,9 +27,21 @@ public interface ICommonGbChannelService { * @param gbDeviceId 国标设备编号 * @param syncKeys 要同步的字段 */ - boolean SyncChannelFromGb28181Device(String gbDeviceId, List syncKeys, Boolean syncGroup, Boolean syncRegion); + boolean syncChannelFromGb28181Device(String gbDeviceId, List syncKeys, Boolean syncGroup, Boolean syncRegion); + + CommonGbChannel getCommonChannelFromDeviceChannel(DeviceChannel deviceChannel, List syncKeys); List getChannelsInRegion(String civilCode); List getChannelsInBusinessGroup(String businessGroupID); + + void updateChannelFromGb28181DeviceInList(Device device, List deviceChannels); + + void addChannelFromGb28181DeviceInList(Device device, List deviceChannels); + + void deleteGbChannelsFromList(List deleteChannelList); + + void channelsOnlineFromList(List deleteChannelList); + + void channelsOfflineFromList(List deleteChannelList); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index cd402a10..f109d4b4 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -87,4 +87,9 @@ public interface IDeviceChannelService { * 直接批量添加 */ void batchAddChannel(List deviceChannels); + + /** + * 重置通道 + */ + boolean resetChannels(Device device, List deviceChannelList); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java index 32b1f361..fc51581c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.common.CommonGbChannel; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.service.ICommonGbChannelService; import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; @@ -75,7 +76,7 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { } @Override - public boolean SyncChannelFromGb28181Device(String gbDeviceId, List syncKeys, Boolean syncGroup, Boolean syncRegion) { + public boolean syncChannelFromGb28181Device(String gbDeviceId, List syncKeys, Boolean syncGroup, Boolean syncRegion) { logger.info("同步通用通道]来自国标设备,国标编号: {}", gbDeviceId); List deviceChannels = deviceChannelMapper.queryAllChannels(gbDeviceId); if (deviceChannels.isEmpty()) { @@ -133,7 +134,8 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { return result; } - private CommonGbChannel getCommonChannelFromDeviceChannel(DeviceChannel deviceChannel, List syncKeys) { + @Override + public CommonGbChannel getCommonChannelFromDeviceChannel(DeviceChannel deviceChannel, List syncKeys) { if (deviceChannel == null) { return null; } @@ -278,4 +280,29 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { public List getChannelsInBusinessGroup(String businessGroupID) { return null; } + + @Override + public void updateChannelFromGb28181DeviceInList(Device device, List deviceChannels) { + + } + + @Override + public void addChannelFromGb28181DeviceInList(Device device, List deviceChannels) { + + } + + @Override + public void deleteGbChannelsFromList(List deleteChannelList) { + commonGbChannelMapper.deleteByDeviceIDs(deleteChannelList); + } + + @Override + public void channelsOnlineFromList(List deleteChannelList) { + + } + + @Override + public void channelsOfflineFromList(List deleteChannelList) { + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 57de6310..a44dc89d 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -4,12 +4,16 @@ import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; +import com.genersoft.iot.vmp.service.ICommonGbChannelService; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; +import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; @@ -17,10 +21,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.interceptor.TransactionAspectSupport; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -40,9 +47,18 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Autowired private DeviceChannelMapper channelMapper; + @Autowired + private PlatformChannelMapper platformChannelMapper; + @Autowired private DeviceMapper deviceMapper; + @Autowired + EventPublisher eventPublisher; + + @Autowired + ICommonGbChannelService commonGbChannelService; + @Override public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) { if (deviceChannel.getLongitude()*deviceChannel.getLatitude() > 0) { @@ -261,5 +277,139 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } } + @Override + @Transactional + public boolean resetChannels(Device device, List deviceChannelList) { + if (CollectionUtils.isEmpty(deviceChannelList)) { + return false; + } + List allChannels = channelMapper.queryAllChannels(device.getDeviceId()); + Map allChannelMap = new ConcurrentHashMap<>(); + if (allChannels.size() > 0) { + for (DeviceChannel deviceChannel : allChannels) { + allChannelMap.put(deviceChannel.getChannelId(), deviceChannel); + } + } + // 数据去重 + List channels = new ArrayList<>(); + List updateChannels = new ArrayList<>(); + List addChannels = new ArrayList<>(); + + StringBuilder stringBuilder = new StringBuilder(); + Map subContMap = new HashMap<>(); + + // 数据去重 + Set gbIdSet = new HashSet<>(); + for (DeviceChannel deviceChannel : deviceChannelList) { + if (gbIdSet.contains(deviceChannel.getChannelId())) { + stringBuilder.append(deviceChannel.getChannelId()).append(","); + continue; + } + gbIdSet.add(deviceChannel.getChannelId()); + if (allChannelMap.containsKey(deviceChannel.getChannelId())) { + deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId()); + deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio()); + deviceChannel.setCommonGbChannelId(allChannelMap.get(deviceChannel.getChannelId()).getCommonGbChannelId()); + if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){ + List strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId()); + if (!CollectionUtils.isEmpty(strings)){ + strings.forEach(platformId->{ + eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()? CatalogEvent.ON:CatalogEvent.OFF); + }); + } + + } + deviceChannel.setUpdateTime(DateUtil.getNow()); + updateChannels.add(deviceChannel); + }else { + deviceChannel.setCreateTime(DateUtil.getNow()); + deviceChannel.setUpdateTime(DateUtil.getNow()); + addChannels.add(deviceChannel); + } + channels.add(deviceChannel); + if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) { + if (subContMap.get(deviceChannel.getParentId()) == null) { + subContMap.put(deviceChannel.getParentId(), 1); + }else { + Integer count = subContMap.get(deviceChannel.getParentId()); + subContMap.put(deviceChannel.getParentId(), count++); + } + } + } + if (channels.size() > 0) { + for (DeviceChannel channel : channels) { + if (subContMap.get(channel.getChannelId()) != null){ + Integer count = subContMap.get(channel.getChannelId()); + if (count > 0) { + channel.setSubCount(count); + channel.setParental(1); + } + } + } + } + + if (stringBuilder.length() > 0) { + logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder); + } + if(CollectionUtils.isEmpty(channels)){ + logger.info("通道重设,数据为空={}" , deviceChannelList); + return false; + } + try { + int limitCount = 50; + int cleanChannelsResult = 0; + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + cleanChannelsResult += channelMapper.cleanChannelsNotInList(device.getDeviceId(), channels.subList(i, toIndex)); + } + } else { + cleanChannelsResult = channelMapper.cleanChannelsNotInList(device.getDeviceId(), channels); + } + boolean result = cleanChannelsResult < 0; + if (!result && addChannels.size() > 0) { + if (addChannels.size() > limitCount) { + for (int i = 0; i < addChannels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > addChannels.size()) { + toIndex = addChannels.size(); + } + result = result || channelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0; + } + }else { + result = result || channelMapper.batchAdd(addChannels) < 0; + } + } + if (!result && updateChannels.size() > 0) { + if (updateChannels.size() > limitCount) { + for (int i = 0; i < updateChannels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > updateChannels.size()) { + toIndex = updateChannels.size(); + } + result = result || channelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0; + } + }else { + result = result || channelMapper.batchUpdate(updateChannels) < 0; + } + } + + if (result) { + //事务回滚 + TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + } + if (device.isAutoSyncChannel()) { + commonGbChannelService.syncChannelFromGb28181Device(device.getDeviceId(), null, true, true); + } + return true; + }catch (Exception e) { + logger.error("未处理的异常 ", e); + TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + return false; + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index cbeebedb..fd8d6a0a 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -323,13 +323,6 @@ public interface IVideoManagerStorage { */ StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); - /** - * catlog查询结束后完全重写通道信息 - * @param deviceId - * @param deviceChannelList - */ - boolean resetChannels(String deviceId, List deviceChannelList); - /** * 获取目录信息 * @param platformId diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java index 44ac660a..4c90ff18 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java @@ -288,4 +288,5 @@ public interface CommonGbChannelMapper { " #{item.commonGbChannelId}" + "") int deleteByDeviceIDs(List clearChannels); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index bef817fe..fe88eee8 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -108,138 +108,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return deviceMapper.getDeviceByDeviceId(deviceId) != null; } - @Override - public boolean resetChannels(String deviceId, List deviceChannelList) { - if (CollectionUtils.isEmpty(deviceChannelList)) { - return false; - } - List allChannels = deviceChannelMapper.queryAllChannels(deviceId); - Map allChannelMap = new ConcurrentHashMap<>(); - if (allChannels.size() > 0) { - for (DeviceChannel deviceChannel : allChannels) { - allChannelMap.put(deviceChannel.getChannelId(), deviceChannel); - } - } - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - // 数据去重 - List channels = new ArrayList<>(); - - List updateChannels = new ArrayList<>(); - List addChannels = new ArrayList<>(); - StringBuilder stringBuilder = new StringBuilder(); - Map subContMap = new HashMap<>(); - - // 数据去重 - Set gbIdSet = new HashSet<>(); - for (DeviceChannel deviceChannel : deviceChannelList) { - if (gbIdSet.contains(deviceChannel.getChannelId())) { - stringBuilder.append(deviceChannel.getChannelId()).append(","); - continue; - } - gbIdSet.add(deviceChannel.getChannelId()); - if (allChannelMap.containsKey(deviceChannel.getChannelId())) { - deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId()); - deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio()); - if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){ - List strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId()); - if (!CollectionUtils.isEmpty(strings)){ - strings.forEach(platformId->{ - eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()?CatalogEvent.ON:CatalogEvent.OFF); - }); - } - - } - deviceChannel.setUpdateTime(DateUtil.getNow()); - updateChannels.add(deviceChannel); - }else { - deviceChannel.setCreateTime(DateUtil.getNow()); - deviceChannel.setUpdateTime(DateUtil.getNow()); - addChannels.add(deviceChannel); - } - channels.add(deviceChannel); - if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) { - if (subContMap.get(deviceChannel.getParentId()) == null) { - subContMap.put(deviceChannel.getParentId(), 1); - }else { - Integer count = subContMap.get(deviceChannel.getParentId()); - subContMap.put(deviceChannel.getParentId(), count++); - } - } - } - if (channels.size() > 0) { - for (DeviceChannel channel : channels) { - if (subContMap.get(channel.getChannelId()) != null){ - Integer count = subContMap.get(channel.getChannelId()); - if (count > 0) { - channel.setSubCount(count); - channel.setParental(1); - } - } - } - } - - if (stringBuilder.length() > 0) { - logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder); - } - if(CollectionUtils.isEmpty(channels)){ - logger.info("通道重设,数据为空={}" , deviceChannelList); - return false; - } - try { - int limitCount = 50; - int cleanChannelsResult = 0; - if (channels.size() > limitCount) { - for (int i = 0; i < channels.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > channels.size()) { - toIndex = channels.size(); - } - cleanChannelsResult += this.deviceChannelMapper.cleanChannelsNotInList(deviceId, channels.subList(i, toIndex)); - } - } else { - cleanChannelsResult = this.deviceChannelMapper.cleanChannelsNotInList(deviceId, channels); - } - boolean result = cleanChannelsResult < 0; - if (!result && addChannels.size() > 0) { - if (addChannels.size() > limitCount) { - for (int i = 0; i < addChannels.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > addChannels.size()) { - toIndex = addChannels.size(); - } - result = result || deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0; - } - }else { - result = result || deviceChannelMapper.batchAdd(addChannels) < 0; - } - } - if (!result && updateChannels.size() > 0) { - if (updateChannels.size() > limitCount) { - for (int i = 0; i < updateChannels.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > updateChannels.size()) { - toIndex = updateChannels.size(); - } - result = result || deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0; - } - }else { - result = result || deviceChannelMapper.batchUpdate(updateChannels) < 0; - } - } - - if (result) { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - } - dataSourceTransactionManager.commit(transactionStatus); //手动提交 - return true; - }catch (Exception e) { - logger.error("未处理的异常 ", e); - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - - } @Override public void deviceChannelOnline(String deviceId, String channelId) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java index e6ed68d6..c5fb45af 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java @@ -67,6 +67,6 @@ public class CommonChannelController { System.out.println("syncKeys===" + Arrays.toString(syncKeys)); System.out.println("syncGroup===" + syncGroup); System.out.println("syncRegion===" + syncRegion); - return commonGbChannelService.SyncChannelFromGb28181Device(deviceId, Lists.newArrayList(syncKeys), syncGroup, syncRegion); + return commonGbChannelService.syncChannelFromGb28181Device(deviceId, Lists.newArrayList(syncKeys), syncGroup, syncRegion); } }