From 8cba63642fcff122907bd7d7a8d7d822555d34ca Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 22 Apr 2024 20:29:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96notify=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/UserSetting.java | 2 +- .../NotifyRequestForCatalogProcessor.java | 26 ++- ...tifyRequestForMobilePositionProcessor.java | 163 ++++++++++-------- .../request/impl/NotifyRequestProcessor.java | 24 ++- .../impl/DeviceChannelServiceImpl.java | 4 +- .../vmp/storager/dao/DeviceChannelMapper.java | 4 +- .../dao/DeviceMobilePositionMapper.java | 15 ++ 7 files changed, 143 insertions(+), 95 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index a9f5c88a..a9b17aef 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -66,7 +66,7 @@ public class UserSetting { private List allowedOrigins = new ArrayList<>(); - private int maxNotifyCountQueue = 10000; + private int maxNotifyCountQueue = 100000; private int registerAgainAfterTime = 60; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index cde70eb4..c80cc88f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -96,10 +96,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 遍历DeviceList while (deviceListIterator.hasNext()) { Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { @@ -264,22 +260,13 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } } + // TODO 同一个通道如果先发送更新再发送离线可能无法正常离线 private void executeSave(){ try { executeSaveForAdd(); } catch (Exception e) { logger.error("[存储收到的增加通道] 异常: ", e ); } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[存储收到的更新通道] 异常: ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[存储收到的删除通道] 异常: ", e ); - } try { executeSaveForOnline(); } catch (Exception e) { @@ -290,6 +277,17 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } catch (Exception e) { logger.error("[存储收到的通道离线] 异常: ", e ); } + try { + executeSaveForUpdate(); + } catch (Exception e) { + logger.error("[存储收到的更新通道] 异常: ", e ); + } + try { + executeSaveForDelete(); + } catch (Exception e) { + logger.error("[存储收到的删除通道] 异常: ", e ); + } + dynamicTask.stop(talkKey); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 89f57c2a..460a5078 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -20,12 +19,12 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -68,78 +67,100 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor private final static String talkKey = "notify-request-for-mobile-position-task"; +// @Async("taskExecutor") public void process(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - + long startTime = System.currentTimeMillis(); // 回复 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); return; } - + Device device = redisCatchStorage.getDevice(deviceId); MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); + List elements = rootElement.elements(); + String channelId = null; + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + channelId = element.getStringValue(); + if (device == null) { + device = redisCatchStorage.getDevice(channelId); + if (device == null) { + // 根据通道id查询设备Id + List deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (!deviceList.isEmpty()) { + device = deviceList.get(0); + } + } + } + if (device == null) { + logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); + return; + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + // 兼容设备部分设备上报是通道编号与设备编号一致的情况 + if (deviceId.equals(channelId)) { + List deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); + if (deviceChannels.size() == 1) { + channelId = deviceChannels.get(0).getChannelId(); + } + } + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getTextTrim().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - - if (device == null) { - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - // 根据通道id查询设备Id - List deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); - } } } - if (device == null) { - logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); - return; - } - // 兼容设备部分设备上报是通道编号与设备编号一致的情况 - if (deviceId.equals(channelId)) { - List deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); - } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - String time = XmlUtil.getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - - mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); +// logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); // 更新device channel 的经纬度 @@ -149,13 +170,13 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(channelId, deviceChannel); + updateChannelMap.put(deviceId + channelId, deviceChannel); addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 300) { + if(updateChannelMap.size() > 100) { executeSaveChannel(); } if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 300) { + if(addMobilePositionList.size() > 100) { executeSaveMobilePosition(); } } @@ -170,7 +191,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor // deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + dynamicTask.startDelay(talkKey, this::executeSave, 3000); } } catch (DocumentException e) { @@ -186,22 +207,24 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor dynamicTask.stop(talkKey); } - private void executeSaveChannel(){ + @Async("taskExecutor") + public void executeSaveChannel(){ + dynamicTask.execute(); try { logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); - ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannelGPS(deviceChannels); +// ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); +// deviceChannelService.batchUpdateChannelGPS(deviceChannels); updateChannelMap.clear(); }catch (Exception e) { } } - - private void executeSaveMobilePosition(){ + @Async("taskExecutor") + public void executeSaveMobilePosition(){ if (userSetting.isSavePositionHistory()) { try { - logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); - deviceChannelService.batchAddMobilePosition(addMobilePositionList); +// logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); +// deviceChannelService.batchAddMobilePosition(addMobilePositionList); addMobilePositionList.clear(); }catch (Exception e) { logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); @@ -209,6 +232,4 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor } } - - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 84f44b58..2dd107a7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -25,6 +25,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -76,6 +78,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + @Autowired + private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -105,10 +110,10 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements logger.error("未处理的异常 ", e); } boolean runed = !taskQueue.isEmpty(); - logger.info("[notify] 待处理消息数量: {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { +// logger.warn("开始处理"); while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); @@ -129,8 +134,12 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements logger.info("接收到Alarm通知"); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("接收到MobilePosition通知"); - processNotifyMobilePosition(take.getEvt()); +// logger.info("接收到MobilePosition通知"); +// processNotifyMobilePosition(take.getEvt()); + taskExecutor.execute(() -> { + notifyRequestForMobilePositionProcessor.process(take.getEvt()); + }); + } else { logger.info("接收到消息:" + cmd); } @@ -147,11 +156,11 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements * * @param evt */ - private void processNotifyMobilePosition(RequestEvent evt) { + @Async("taskExecutor") + public void processNotifyMobilePosition(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - // 回复 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { @@ -360,4 +369,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } + + @Scheduled(fixedRate = 1000) //每1秒执行一次 + public void execute(){ + System.out.println("待处理消息数量: " + taskQueue.size()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 37603812..f7666409 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -356,11 +356,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Override public void batchUpdateChannelGPS(List channelList) { - + channelMapper.batchUpdate(channelList); } @Override public void batchAddMobilePosition(List mobilePositions) { - + deviceMobilePositionMapper.batchadd(mobilePositions); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 4aa98530..ae223367 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -347,8 +347,8 @@ public interface DeviceChannelMapper { ", has_audio=#{item.hasAudio}" + ", longitude=#{item.longitude}" + ", latitude=#{item.latitude}" + - ", custom_longitude=#{item.customLongitude}" + - ", custom_latitude=#{item.customLatitude}" + + ", custom_longitude=#{item.customLongitude}" + + ", custom_latitude=#{item.customLatitude}" + ", longitude_gcj02=#{item.longitudeGcj02}" + ", latitude_gcj02=#{item.latitudeGcj02}" + ", longitude_wgs84=#{item.longitudeWgs84}" + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index 7bf243ca..e3d89826 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -33,4 +33,19 @@ public interface DeviceMobilePositionMapper { @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}") int clearMobilePositionsByDeviceId(String deviceId); + + @Insert("") + void batchadd(List mobilePositions); + }