From 2a2a3943f4073b51c53ab5aab69f16fe6d503e32 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 26 Sep 2024 12:39:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96notify=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NotifyRequestForCatalogProcessor.java | 70 +++++++++---------- ...tifyRequestForMobilePositionProcessor.java | 10 ++- .../vmp/service/IDeviceChannelService.java | 7 +- .../impl/DeviceChannelServiceImpl.java | 68 +++++++++++++++--- .../vmp/storager/dao/DeviceChannelMapper.java | 4 +- 5 files changed, 102 insertions(+), 57 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 155d5f73..5e2261ca 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -42,8 +41,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); - private final List updateChannelOnlineList = new CopyOnWriteArrayList<>(); - private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); + private final List updateChannelForStatusChange = new CopyOnWriteArrayList<>(); +// private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map updateChannelMap = new ConcurrentHashMap<>(); private final Map addChannelMap = new ConcurrentHashMap<>(); @@ -63,9 +62,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IDeviceChannelService deviceChannelService; - @Autowired - private DynamicTask dynamicTask; - @Autowired private SipConfig sipConfig; @@ -77,13 +73,24 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } taskQueue.offer(new HandlerCatchData(evt, null, null)); } +// @Scheduled(fixedRate = 2000) //每400毫秒执行一次 +// public void showSize(){ +// logger.warn("[notify-目录订阅] 待处理消息数量: {}", taskQueue.size() ); +// } @Scheduled(fixedRate = 400) //每400毫秒执行一次 public void executeTaskQueue(){ if (taskQueue.isEmpty()) { return; } - for (HandlerCatchData take : taskQueue) { + List handlerCatchDataList = new ArrayList<>(); + while (!taskQueue.isEmpty()) { + handlerCatchDataList.add(taskQueue.poll()); + } + if (handlerCatchDataList.isEmpty()) { + return; + } + for (HandlerCatchData take : handlerCatchDataList) { if (take == null) { continue; } @@ -130,12 +137,15 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setParentId(null); } channel.setDeviceId(device.getDeviceId()); - logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); + if (logger.isDebugEnabled()){ + logger.debug("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); + } switch (event) { case CatalogEvent.ON: // 上线 logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - updateChannelOnlineList.add(channel); + channel.setStatus(true); + updateChannelForStatusChange.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); @@ -147,7 +157,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent if (userSetting.getRefuseChannelStatusChannelFormNotify()) { logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); } else { - updateChannelOfflineList.add(channel); + channel.setStatus(false); + updateChannelForStatusChange.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); @@ -160,7 +171,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent if (userSetting.getRefuseChannelStatusChannelFormNotify()) { logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); } else { - updateChannelOfflineList.add(channel); + channel.setStatus(false); + updateChannelForStatusChange.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); @@ -173,7 +185,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent if (userSetting.getRefuseChannelStatusChannelFormNotify()) { logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); } else { - updateChannelOfflineList.add(channel); + channel.setStatus(false); + updateChannelForStatusChange.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); @@ -211,6 +224,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent case CatalogEvent.UPDATE: // 更新 logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + // 判断此通道是否存在 DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); if (deviceChannelForUpdate != null) { @@ -242,8 +256,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent taskQueue.clear(); if (!updateChannelMap.keySet().isEmpty() || !addChannelMap.keySet().isEmpty() - || !updateChannelOnlineList.isEmpty() - || !updateChannelOfflineList.isEmpty() + || !updateChannelForStatusChange.isEmpty() || !deleteChannelList.isEmpty()) { executeSave(); } @@ -256,14 +269,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent logger.error("[存储收到的增加通道] 异常: ", e ); } try { - executeSaveForOnline(); + executeSaveForStatus(); } catch (Exception e) { - logger.error("[存储收到的通道上线] 异常: ", e ); - } - try { - executeSaveForOffline(); - } catch (Exception e) { - logger.error("[存储收到的通道离线] 异常: ", e ); + logger.error("[存储收到的通道状态变化] 异常: ", e ); } try { executeSaveForUpdate(); @@ -301,22 +309,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } } - private void executeSaveForOnline(){ - if (!updateChannelOnlineList.isEmpty()) { - deviceChannelService.channelsOnline(updateChannelOnlineList); - updateChannelOnlineList.clear(); + private void executeSaveForStatus(){ + if (!updateChannelForStatusChange.isEmpty()) { + deviceChannelService.updateChannelsStaus(updateChannelForStatusChange); + updateChannelForStatusChange.clear(); } } - - private void executeSaveForOffline(){ - if (!updateChannelOfflineList.isEmpty()) { - deviceChannelService.channelsOffline(updateChannelOfflineList); - updateChannelOfflineList.clear(); - } - } - -// @Scheduled(fixedRate = 10000) //每1秒执行一次 -// public void execute(){ -// logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size()); -// } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 75a3b0d2..9d24f3ea 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -25,6 +25,7 @@ import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -68,7 +69,14 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor if (taskQueue.isEmpty()) { return; } - for (HandlerCatchData take : taskQueue) { + List handlerCatchDataList = new ArrayList<>(); + while (!taskQueue.isEmpty()) { + handlerCatchDataList.add(taskQueue.poll()); + } + if (handlerCatchDataList.isEmpty()) { + return; + } + for (HandlerCatchData take : handlerCatchDataList) { if (take == null) { continue; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index a8cd5813..39b5939a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -67,12 +67,7 @@ public interface IDeviceChannelService { /** * 批量上线 */ - int channelsOnline(List channels); - - /** - * 批量下线 - */ - int channelsOffline(List channels); + int updateChannelsStaus(List channels); /** * 获取一个通道 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 58e7b02a..a82d7ebd 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -239,13 +239,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override - public int deleteChannels(List deleteChannelList) { - return channelMapper.batchDel(deleteChannelList); + @Transactional + public int deleteChannels(List channels) { + int limitCount = 1000; + int result = 0; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + result += channelMapper.batchDel(channels.subList(i, toIndex)); + } + }else { + result += channelMapper.batchDel(channels); + } + } + return result; } @Override - public int channelsOnline(List channels) { - return channelMapper.batchOnline(channels); + @Transactional + public int updateChannelsStaus(List channels) { + int limitCount = 1000; + int result = 0; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + result += channelMapper.batchUpdateStatus(channels.subList(i, toIndex)); + } + }else { + result += channelMapper.batchUpdateStatus(channels); + } + } + return result; } @Override @@ -253,12 +285,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { channelMapper.online(channel.getDeviceId(), channel.getChannelId()); } - @Override - public int channelsOffline(List channels) { - return channelMapper.batchOffline(channels); - } - - @Override public void offline(DeviceChannel channel) { channelMapper.offline(channel.getDeviceId(), channel.getChannelId()); @@ -275,6 +301,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override + @Transactional public synchronized void batchUpdateChannel(List channels) { String now = DateUtil.getNow(); for (DeviceChannel channel : channels) { @@ -297,8 +324,27 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override + @Transactional public void batchAddChannel(List channels) { - channelMapper.batchAdd(channels); + String now = DateUtil.getNow(); + for (DeviceChannel channel : channels) { + channel.setUpdateTime(now); + channel.setCreateTime(now); + } + int limitCount = 1000; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + channelMapper.batchAdd(channels.subList(i, toIndex)); + } + }else { + channelMapper.batchAdd(channels); + } + } for (DeviceChannel channel : channels) { if (channel.getParentId() != null) { channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 15552211..0172be2b 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -479,10 +479,10 @@ public interface DeviceChannelMapper { @Update({""}) - int batchOnline(@Param("channels") List channels); + int batchUpdateStatus(@Param("channels") List channels); @Update({"