From bbbadccb799355956dcc3b8d3986117e47f6a4a7 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 11 Oct 2024 14:09:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96notify=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/bean/NotifyCatalogChannel.java | 38 ++ .../cmd/SIPRequestHeaderPlarformProvider.java | 2 +- .../cmd/impl/SIPCommanderFroPlatform.java | 4 +- .../NotifyRequestForCatalogProcessor.java | 502 +++++++++--------- ...tifyRequestForMobilePositionProcessor.java | 13 +- .../vmp/service/IDeviceChannelService.java | 8 +- .../impl/DeviceChannelServiceImpl.java | 17 +- .../vmp/storager/dao/DeviceChannelMapper.java | 70 ++- .../vmanager/gb28181/device/DeviceQuery.java | 2 +- 9 files changed, 375 insertions(+), 281 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/NotifyCatalogChannel.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/NotifyCatalogChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/NotifyCatalogChannel.java new file mode 100644 index 00000000..8961677d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/NotifyCatalogChannel.java @@ -0,0 +1,38 @@ +package com.genersoft.iot.vmp.gb28181.bean; + + +public class NotifyCatalogChannel { + + private Type type; + + private DeviceChannel channel; + + + public enum Type { + ADD, DELETE, UPDATE, STATUS_CHANGED + } + + + public static NotifyCatalogChannel getInstance(Type type, DeviceChannel channel) { + NotifyCatalogChannel notifyCatalogChannel = new NotifyCatalogChannel(); + notifyCatalogChannel.setType(type); + notifyCatalogChannel.setChannel(channel); + return notifyCatalogChannel; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public DeviceChannel getChannel() { + return channel; + } + + public void setChannel(DeviceChannel channel) { + this.channel = channel; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index de32339a..abb95b56 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -39,7 +39,7 @@ public class SIPRequestHeaderPlarformProvider { @Autowired private SipConfig sipConfig; - + @Autowired private SipLayer sipLayer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 02089d12..3a6bf99a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -596,7 +596,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels, deviceChannels.size(), type, subscribeInfo); System.out.println(catalogXmlContent); - logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size()); + logger.info("[发送NOTIFY通知]类型: {},平台:{}, 发送数量: {}", type, parentPlatform.getServerGBId(), channels.size()); sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); }, (eventResult -> { @@ -702,7 +702,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { }else { channels = deviceChannels.subList(index, deviceChannels.size()); } - logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size()); + logger.info("[发送NOTIFY通知]类型: {},平台:{}, 发送数量: {}", type, parentPlatform.getServerGBId(), channels.size()); Integer finalIndex = index; String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, channels, type); sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 5e2261ca..e9ec1ea7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; +import com.genersoft.iot.vmp.gb28181.bean.NotifyCatalogChannel; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -13,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; +import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; @@ -23,12 +25,12 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; +import javax.sip.header.CSeqHeader; import javax.sip.header.FromHeader; +import javax.sip.message.Request; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -38,281 +40,257 @@ import java.util.concurrent.CopyOnWriteArrayList; @Component public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent { - private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); - private final List updateChannelForStatusChange = new CopyOnWriteArrayList<>(); -// private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); - private final Map updateChannelMap = new ConcurrentHashMap<>(); + private final ConcurrentLinkedQueue channelList = new ConcurrentLinkedQueue<>(); - private final Map addChannelMap = new ConcurrentHashMap<>(); - private final List deleteChannelList = new CopyOnWriteArrayList<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + @Autowired + private UserSetting userSetting; - @Autowired - private UserSetting userSetting; + @Autowired + private EventPublisher eventPublisher; - @Autowired - private EventPublisher eventPublisher; + @Autowired + private IRedisCatchStorage redisCatchStorage; - @Autowired - private IRedisCatchStorage redisCatchStorage; + @Autowired + private IDeviceChannelService deviceChannelService; - @Autowired - private IDeviceChannelService deviceChannelService; + @Autowired + private SipConfig sipConfig; - @Autowired - private SipConfig sipConfig; - - @Transactional - public void process(RequestEvent evt) { - if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { - logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); - return; - } - taskQueue.offer(new HandlerCatchData(evt, null, null)); - } + @Transactional + public void process(RequestEvent evt) { + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } // @Scheduled(fixedRate = 2000) //每400毫秒执行一次 // public void showSize(){ // logger.warn("[notify-目录订阅] 待处理消息数量: {}", taskQueue.size() ); // } - @Scheduled(fixedRate = 400) //每400毫秒执行一次 - public void executeTaskQueue(){ - if (taskQueue.isEmpty()) { - return; - } - List handlerCatchDataList = new ArrayList<>(); - while (!taskQueue.isEmpty()) { - handlerCatchDataList.add(taskQueue.poll()); - } - if (handlerCatchDataList.isEmpty()) { - return; - } - for (HandlerCatchData take : handlerCatchDataList) { - if (take == null) { - continue; - } - RequestEvent evt = take.getEvt(); + @Scheduled(fixedRate = 400) //每400毫秒执行一次 + @Transactional + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List handlerCatchDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } + } +// while (!taskQueue.isEmpty()) { +// handlerCatchDataList.add(taskQueue.poll()); +// } + if (handlerCatchDataList.isEmpty()) { + return; + } + for (HandlerCatchData take : handlerCatchDataList) { + if (take == null) { + logger.warn("[收到目录订阅]:但是队列内任务为空"); + continue; + } + RequestEvent evt = take.getEvt(); + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || !device.isOnLine()) { + logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId() : "")); + continue; + } + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); + continue; + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + logger.warn("[ 收到目录订阅 ] 解析xml获取DeviceList失败, {}", evt.getRequest()); + continue; + } + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : "")); + event = CatalogEvent.ADD; + } else { + event = eventElement.getText().toUpperCase(); + } + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); + if (channel == null) { + logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + channel.setDeviceId(device.getDeviceId()); + if (logger.isDebugEnabled()) { + logger.debug("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); + } + switch (event) { + case CatalogEvent.ON: + // 上线 + logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + channel.setStatus(true); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); + } + break; + case CatalogEvent.OFF: + // 离线 + logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + channel.setStatus(false); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.VLOST: + // 视频丢失 + logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + channel.setStatus(false); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEFECT: + // 故障 + logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + channel.setStatus(false); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.ADD: + // 增加 + logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + // 判断此通道是否存在 + DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannel != null) { + logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + channel.setId(deviceChannel.getId()); + channel.setHasAudio(deviceChannel.getHasAudio()); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel)); + } else { + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel)); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + } + + break; + case CatalogEvent.DEL: + // 删除 + logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel)); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); + } + break; + case CatalogEvent.UPDATE: + // 更新 + logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}, 状态: {}", device.getDeviceId(), channel.getChannelId(), channel.isStatus()); + + // 判断此通道是否存在 + DeviceChannel deviceChannelInDb = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannelInDb != null) { + channel.setId(deviceChannelInDb.getId()); + channel.setUpdateTime(DateUtil.getNow()); + channel.setHasAudio(deviceChannelInDb.getHasAudio()); + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel)); + } else { + channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel)); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + } + break; + default: + logger.warn("[ NotifyCatalog ] event not found : {}", event); + + } + // 转发变化信息 + eventPublisher.catalogEventPublish(null, channel, event); + } + } + + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); + } + } + if (!channelList.isEmpty()) { + executeSave(); + } + } + + @Transactional + public void executeSave() { + int size = channelList.size(); + List channelListForSave = new ArrayList<>(); + for (int i = 0; i < size; i++) { + channelListForSave.add(channelList.poll()); + } + + for (NotifyCatalogChannel notifyCatalogChannel : channelListForSave) { try { - long start = System.currentTimeMillis(); - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId() : "")); - return; + switch (notifyCatalogChannel.getType()) { + case STATUS_CHANGED: + deviceChannelService.updateChannelStatus(notifyCatalogChannel.getChannel()); + break; + case ADD: + deviceChannelService.addChannel(notifyCatalogChannel.getChannel()); + break; + case UPDATE: + deviceChannelService.updateChannelForNotify(notifyCatalogChannel.getChannel()); + break; + case DELETE: + deviceChannelService.delete(notifyCatalogChannel.getChannel()); + break; } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : "")); - event = CatalogEvent.ADD; - } else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); - if (channel == null) { - logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); - continue; - } - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - channel.setDeviceId(device.getDeviceId()); - if (logger.isDebugEnabled()){ - logger.debug("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); - } - switch (event) { - case CatalogEvent.ON: - // 上线 - logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - channel.setStatus(true); - updateChannelForStatusChange.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); - } - break; - case CatalogEvent.OFF: - // 离线 - logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - } else { - channel.setStatus(false); - updateChannelForStatusChange.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.VLOST: - // 视频丢失 - logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - } else { - channel.setStatus(false); - updateChannelForStatusChange.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.DEFECT: - // 故障 - logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - } else { - channel.setStatus(false); - updateChannelForStatusChange.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.ADD: - // 增加 - logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - // 判断此通道是否存在 - DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannel != null) { - logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - channel.setId(deviceChannel.getId()); - channel.setHasAudio(null); - updateChannelMap.put(channel.getChannelId(), channel); - } else { - addChannelMap.put(channel.getChannelId(), channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - } - - break; - case CatalogEvent.DEL: - // 删除 - logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - deleteChannelList.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); - } - break; - case CatalogEvent.UPDATE: - // 更新 - logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - - // 判断此通道是否存在 - DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannelForUpdate != null) { - channel.setId(deviceChannelForUpdate.getId()); - channel.setUpdateTime(DateUtil.getNow()); - channel.setHasAudio(null); - updateChannelMap.put(channel.getChannelId(), channel); - } else { - addChannelMap.put(channel.getChannelId(), channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - } - break; - default: - logger.warn("[ NotifyCatalog ] event not found : {}", event); - - } - // 转发变化信息 - eventPublisher.catalogEventPublish(null, channel, event); - } - } - - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); + }catch (Exception e) { + logger.error("[存储收到的通道]类型:{},编号:{}", notifyCatalogChannel.getType(), + notifyCatalogChannel.getChannel().getDeviceId(), e); } - } - taskQueue.clear(); - if (!updateChannelMap.keySet().isEmpty() - || !addChannelMap.keySet().isEmpty() - || !updateChannelForStatusChange.isEmpty() - || !deleteChannelList.isEmpty()) { - executeSave(); - } - } - - public void executeSave(){ - try { - executeSaveForAdd(); - } catch (Exception e) { - logger.error("[存储收到的增加通道] 异常: ", e ); - } - try { - executeSaveForStatus(); - } catch (Exception e) { - logger.error("[存储收到的通道状态变化] 异常: ", e ); - } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[存储收到的更新通道] 异常: ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[存储收到的删除通道] 异常: ", e ); - } - } - - private void executeSaveForUpdate(){ - if (!updateChannelMap.values().isEmpty()) { - logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size()); - ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannel(deviceChannels); - updateChannelMap.clear(); - } - } - - private void executeSaveForAdd(){ - if (!addChannelMap.values().isEmpty()) { - ArrayList deviceChannels = new ArrayList<>(addChannelMap.values()); - addChannelMap.clear(); - deviceChannelService.batchAddChannel(deviceChannels); - } - } - - private void executeSaveForDelete(){ - if (!deleteChannelList.isEmpty()) { - deviceChannelService.deleteChannels(deleteChannelList); - deleteChannelList.clear(); - } - } - - private void executeSaveForStatus(){ - if (!updateChannelForStatusChange.isEmpty()) { - deviceChannelService.updateChannelsStaus(updateChannelForStatusChange); - updateChannelForStatusChange.clear(); - } - } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 9d24f3ea..fe7fb2ab 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; @@ -71,7 +72,10 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor } List handlerCatchDataList = new ArrayList<>(); while (!taskQueue.isEmpty()) { - handlerCatchDataList.add(taskQueue.poll()); + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } } if (handlerCatchDataList.isEmpty()) { return; @@ -89,12 +93,12 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); - return; + continue; } Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId); - return; + continue; } MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setDeviceId(device.getDeviceId()); @@ -151,7 +155,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor } } - logger.debug("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + logger.debug("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); @@ -194,7 +198,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor logger.error("未处理的异常 ", e); } } - taskQueue.clear(); } // @Scheduled(fixedRate = 10000) // public void execute(){ diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index 39b5939a..e8270702 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -25,7 +25,7 @@ public interface IDeviceChannelService { * @param deviceId 设备id * @param channel 通道 */ - void updateChannel(String deviceId, DeviceChannel channel); + void updateChannelForNotify(String deviceId, DeviceChannel channel); /** * 批量添加设备通道 @@ -102,4 +102,10 @@ public interface IDeviceChannelService { void offline(DeviceChannel channel); void delete(DeviceChannel channel); + + void updateChannelStatus(DeviceChannel channel); + + void addChannel(DeviceChannel channel); + + void updateChannelForNotify(DeviceChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index a82d7ebd..99d48309 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -96,7 +96,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override - public void updateChannel(String deviceId, DeviceChannel channel) { + public void updateChannelForNotify(String deviceId, DeviceChannel channel) { String channelId = channel.getChannelId(); channel.setDeviceId(deviceId); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); @@ -116,6 +116,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { channelMapper.updateChannelSubCount(deviceId,channel.getParentId()); } + @Override + public void updateChannelForNotify(DeviceChannel channel) { + channelMapper.updateForNotify(channel); + } + @Override public int updateChannels(String deviceId, List channels) { List addChannels = new ArrayList<>(); @@ -280,6 +285,16 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { return result; } + @Override + public void updateChannelStatus(DeviceChannel channel) { + channelMapper.updateStatus(channel); + } + + @Override + public void addChannel(DeviceChannel channel) { + channelMapper.add(channel); + } + @Override public void online(DeviceChannel channel) { channelMapper.online(channel.getDeviceId(), channel.getChannelId()); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 0172be2b..8f14cd61 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -17,14 +17,22 @@ import java.util.List; @Repository public interface DeviceChannelMapper { - @Insert("INSERT INTO wvp_device_channel (channel_id, device_id, name, manufacture, model, owner, civil_code, block, " + - "address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, secrecy, " + - "ip_address, port, password, ptz_type, status, stream_id, longitude, latitude, longitude_gcj02, latitude_gcj02, " + - "longitude_wgs84, latitude_wgs84, has_audio, create_time, update_time, business_group_id, gps_time, stream_identification) " + - "VALUES (#{channelId}, #{deviceId}, #{name}, #{manufacture}, #{model}, #{owner}, #{civilCode}, #{block}," + - "#{address}, #{parental}, #{parentId}, #{safetyWay}, #{registerWay}, #{certNum}, #{certifiable}, #{errCode}, #{secrecy}, " + - "#{ipAddress}, #{port}, #{password}, #{ptzType}, #{status}, #{streamId}, #{longitude}, #{latitude}, #{longitudeGcj02}, " + - "#{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{hasAudio}, #{createTime}, #{updateTime}, #{businessGroupId}, #{gpsTime}, #{streamIdentification})") + @Insert("") int add(DeviceChannel channel); @Update(value = {" "}) int update(DeviceChannel channel); + @Update(value = {" "}) + int updateForNotify(DeviceChannel channel); + @Select(value = {" "}) void batchUpdatePosition(List channelList); + @Update("UPDATE wvp_device_channel SET status=#{status} WHERE device_id=#{deviceId} AND channel_id=#{channelId}") + void updateStatus(DeviceChannel channel); + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 8a2618bb..e847cd64 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -261,7 +261,7 @@ public class DeviceQuery { @Parameter(name = "channel", description = "通道信息", required = true) @PostMapping("/channel/update/{deviceId}") public void updateChannel(@PathVariable String deviceId,DeviceChannel channel){ - deviceChannelService.updateChannel(deviceId, channel); + deviceChannelService.updateChannelForNotify(deviceId, channel); } @Operation(summary = "修改通道的码流类型", security = @SecurityRequirement(name = JwtUtils.HEADER))