diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index a92cbeea..6ff32168 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -688,18 +688,10 @@ public interface DeviceChannelMapper { @Update({""}) - int batchOnlineForNotify(List channels); - - @Update({""}) - int batchOfflineForNotify(List channels); - + int batchUpdateStatus(List channels); @Select("select count(1) from wvp_device_channel where status = 'ON'") int getOnlineCount(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java index f09cbb4f..8a4b7d89 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java @@ -3,8 +3,8 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; -import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce; +import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; import com.github.pagehelper.PageInfo; @@ -47,15 +47,7 @@ public interface IDeviceChannelService { */ int deleteChannelsForNotify(List deleteChannelList); - /** - * 批量上线 - */ - int channelsOnlineForNotify(List channels); - - /** - * 批量下线 - */ - int channelsOfflineForNotify(List channels); + int updateChannelsStatus(List channels); /** * 获取一个通道 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index 32e73e8f..fce13e9a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -193,13 +193,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override - public int deleteChannelsForNotify(List deleteChannelList) { - return channelMapper.batchDelForNotify(deleteChannelList); + @Transactional + public int deleteChannelsForNotify(List channels) { + int limitCount = 1000; + int result = 0; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + result += channelMapper.batchDel(channels.subList(i, toIndex)); + } + }else { + result += channelMapper.batchDel(channels); + } + } + return result; } + @Transactional @Override - public int channelsOnlineForNotify(List channels) { - return channelMapper.batchOnlineForNotify(channels); + public int updateChannelsStatus(List channels) { + int limitCount = 1000; + int result = 0; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + result += channelMapper.batchUpdateStatus(channels.subList(i, toIndex)); + } + }else { + result += channelMapper.batchUpdateStatus(channels); + } + } + return result; } @Override @@ -207,12 +239,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { channelMapper.online(channel.getId()); } - @Override - public int channelsOfflineForNotify(List channels) { - return channelMapper.batchOfflineForNotify(channels); - } - - @Override public void offline(DeviceChannel channel) { channelMapper.offline(channel.getId()); @@ -242,6 +268,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override + @Transactional public synchronized void batchUpdateChannelForNotify(List channels) { String now = DateUtil.getNow(); for (DeviceChannel channel : channels) { @@ -264,8 +291,27 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override + @Transactional public void batchAddChannel(List channels) { - channelMapper.batchAdd(channels); + String now = DateUtil.getNow(); + for (DeviceChannel channel : channels) { + channel.setUpdateTime(now); + channel.setCreateTime(now); + } + int limitCount = 1000; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + channelMapper.batchAdd(channels.subList(i, toIndex)); + } + }else { + channelMapper.batchAdd(channels); + } + } for (DeviceChannel channel : channels) { if (channel.getParentId() != null) { channelMapper.updateChannelSubCount(channel.getDeviceDbId(), channel.getParentId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 6a504219..9e9cc35e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -39,8 +39,7 @@ import java.util.concurrent.CopyOnWriteArrayList; @Component public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent { - private final List updateChannelOnlineList = new CopyOnWriteArrayList<>(); - private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); + private final List updateChannelForStatusChange = new CopyOnWriteArrayList<>(); private final Map updateChannelMap = new ConcurrentHashMap<>(); private final Map addChannelMap = new ConcurrentHashMap<>(); @@ -60,6 +59,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IDeviceChannelService deviceChannelService; +// @Scheduled(fixedRate = 2000) //每400毫秒执行一次 +// public void showSize(){ +// log.warn("[notify-目录订阅] 待处理消息数量: {}", taskQueue.size() ); +// } @Transactional public void process(RequestEvent evt) { @@ -75,7 +78,14 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent if (taskQueue.isEmpty()) { return; } - for (HandlerCatchData take : taskQueue) { + List handlerCatchDataList = new ArrayList<>(); + while (!taskQueue.isEmpty()) { + handlerCatchDataList.add(taskQueue.poll()); + } + if (handlerCatchDataList.isEmpty()) { + return; + } + for (HandlerCatchData take : handlerCatchDataList) { if (take == null) { continue; } @@ -119,14 +129,17 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent log.error("[解析CatalogChannelEvent]失败原文: \n{}", new String(evt.getRequest().getRawContent(), Charset.forName(device.getCharset()))); continue; } - - log.info("[收到目录订阅]:{}/{}-{}", device.getDeviceId(), - catalogChannelEvent.getChannel().getDeviceId(), catalogChannelEvent.getEvent()); + if (log.isDebugEnabled()){ + log.debug("[收到目录订阅]:{}/{}-{}", device.getDeviceId(), + catalogChannelEvent.getChannel().getDeviceId(), catalogChannelEvent.getEvent()); + } + DeviceChannel channel = catalogChannelEvent.getChannel(); switch (catalogChannelEvent.getEvent()) { case CatalogEvent.ON: // 上线 log.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); - updateChannelOnlineList.add(catalogChannelEvent.getChannel()); + channel.setStatus("ON"); + updateChannelForStatusChange.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), true); @@ -138,7 +151,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent if (userSetting.getRefuseChannelStatusChannelFormNotify()) { log.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); } else { - updateChannelOfflineList.add(catalogChannelEvent.getChannel()); + channel.setStatus("OFF"); + updateChannelForStatusChange.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); @@ -151,7 +165,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent if (userSetting.getRefuseChannelStatusChannelFormNotify()) { log.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); } else { - updateChannelOfflineList.add(catalogChannelEvent.getChannel()); + channel.setStatus("OFF"); + updateChannelForStatusChange.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); @@ -164,7 +179,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent if (userSetting.getRefuseChannelStatusChannelFormNotify()) { log.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); } else { - updateChannelOfflineList.add(catalogChannelEvent.getChannel()); + channel.setStatus("OFF"); + updateChannelForStatusChange.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); @@ -178,7 +194,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, catalogChannelEvent.getChannel().getDeviceId()); if (deviceChannel != null) { log.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); - DeviceChannel channel = catalogChannelEvent.getChannel(); channel.setId(deviceChannel.getId()); channel.setHasAudio(deviceChannel.isHasAudio()); channel.setUpdateTime(DateUtil.getNow()); @@ -210,7 +225,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 判断此通道是否存在 DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, catalogChannelEvent.getChannel().getDeviceId()); if (deviceChannelForUpdate != null) { - DeviceChannel channel = catalogChannelEvent.getChannel(); channel.setId(deviceChannelForUpdate.getId()); channel.setHasAudio(deviceChannelForUpdate.isHasAudio()); channel.setUpdateTime(DateUtil.getNow()); @@ -242,8 +256,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent taskQueue.clear(); if (!updateChannelMap.keySet().isEmpty() || !addChannelMap.keySet().isEmpty() - || !updateChannelOnlineList.isEmpty() - || !updateChannelOfflineList.isEmpty() + || !updateChannelForStatusChange.isEmpty() || !deleteChannelList.isEmpty()) { executeSave(); } @@ -256,14 +269,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent log.error("[存储收到的增加通道] 异常: ", e ); } try { - executeSaveForOnline(); + executeSaveForStatus(); } catch (Exception e) { - log.error("[存储收到的通道上线] 异常: ", e ); - } - try { - executeSaveForOffline(); - } catch (Exception e) { - log.error("[存储收到的通道离线] 异常: ", e ); + log.error("[存储收到的通道状态变化] 异常: ", e ); } try { executeSaveForUpdate(); @@ -301,17 +309,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } } - private void executeSaveForOnline(){ - if (!updateChannelOnlineList.isEmpty()) { - deviceChannelService.channelsOnlineForNotify(updateChannelOnlineList); - updateChannelOnlineList.clear(); - } - } - - private void executeSaveForOffline(){ - if (!updateChannelOfflineList.isEmpty()) { - deviceChannelService.channelsOfflineForNotify(updateChannelOfflineList); - updateChannelOfflineList.clear(); + private void executeSaveForStatus(){ + if (!updateChannelForStatusChange.isEmpty()) { + deviceChannelService.updateChannelsStatus(updateChannelForStatusChange); + updateChannelForStatusChange.clear(); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 5d376689..a792207b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -24,6 +24,7 @@ import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -65,7 +66,14 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor if (taskQueue.isEmpty()) { return; } - for (HandlerCatchData take : taskQueue) { + List handlerCatchDataList = new ArrayList<>(); + while (!taskQueue.isEmpty()) { + handlerCatchDataList.add(taskQueue.poll()); + } + if (handlerCatchDataList.isEmpty()) { + return; + } + for (HandlerCatchData take : handlerCatchDataList) { if (take == null) { continue; }