优化notify消息处理

2.7.0
648540858 2024-04-22 20:29:36 +08:00
parent b57dbeac13
commit 8cba63642f
7 changed files with 143 additions and 95 deletions

View File

@ -66,7 +66,7 @@ public class UserSetting {
private List<String> allowedOrigins = new ArrayList<>(); private List<String> allowedOrigins = new ArrayList<>();
private int maxNotifyCountQueue = 10000; private int maxNotifyCountQueue = 100000;
private int registerAgainAfterTime = 60; private int registerAgainAfterTime = 60;

View File

@ -96,10 +96,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 遍历DeviceList // 遍历DeviceList
while (deviceListIterator.hasNext()) { while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next(); Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
continue;
}
Element eventElement = itemDevice.element("Event"); Element eventElement = itemDevice.element("Event");
String event; String event;
if (eventElement == null) { if (eventElement == null) {
@ -264,22 +260,13 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
} }
} }
// TODO 同一个通道如果先发送更新再发送离线可能无法正常离线
private void executeSave(){ private void executeSave(){
try { try {
executeSaveForAdd(); executeSaveForAdd();
} catch (Exception e) { } catch (Exception e) {
logger.error("[存储收到的增加通道] 异常: ", e ); logger.error("[存储收到的增加通道] 异常: ", e );
} }
try {
executeSaveForUpdate();
} catch (Exception e) {
logger.error("[存储收到的更新通道] 异常: ", e );
}
try {
executeSaveForDelete();
} catch (Exception e) {
logger.error("[存储收到的删除通道] 异常: ", e );
}
try { try {
executeSaveForOnline(); executeSaveForOnline();
} catch (Exception e) { } catch (Exception e) {
@ -290,6 +277,17 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
} catch (Exception e) { } catch (Exception e) {
logger.error("[存储收到的通道离线] 异常: ", e ); logger.error("[存储收到的通道离线] 异常: ", e );
} }
try {
executeSaveForUpdate();
} catch (Exception e) {
logger.error("[存储收到的更新通道] 异常: ", e );
}
try {
executeSaveForDelete();
} catch (Exception e) {
logger.error("[存储收到的删除通道] 异常: ", e );
}
dynamicTask.stop(talkKey); dynamicTask.stop(talkKey);
} }

View File

@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
@ -20,12 +19,12 @@ import org.dom4j.Element;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -68,78 +67,100 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
private final static String talkKey = "notify-request-for-mobile-position-task"; private final static String talkKey = "notify-request-for-mobile-position-task";
// @Async("taskExecutor")
public void process(RequestEvent evt) { public void process(RequestEvent evt) {
try { try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
long startTime = System.currentTimeMillis();
// 回复 200 OK // 回复 200 OK
Element rootElement = getRootElement(evt); Element rootElement = getRootElement(evt);
if (rootElement == null) { if (rootElement == null) {
logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
return; return;
} }
Device device = redisCatchStorage.getDevice(deviceId);
MobilePosition mobilePosition = new MobilePosition(); MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setCreateTime(DateUtil.getNow());
List<Element> elements = rootElement.elements();
String channelId = null;
for (Element element : elements) {
switch (element.getName()){
case "DeviceID":
channelId = element.getStringValue();
if (device == null) {
device = redisCatchStorage.getDevice(channelId);
if (device == null) {
// 根据通道id查询设备Id
List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
if (!deviceList.isEmpty()) {
device = deviceList.get(0);
}
}
}
if (device == null) {
logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
return;
}
mobilePosition.setDeviceId(device.getDeviceId());
mobilePosition.setChannelId(channelId);
// 兼容设备部分设备上报是通道编号与设备编号一致的情况
if (deviceId.equals(channelId)) {
List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
if (deviceChannels.size() == 1) {
channelId = deviceChannels.get(0).getChannelId();
}
}
if (!ObjectUtils.isEmpty(device.getName())) {
mobilePosition.setDeviceName(device.getName());
}
mobilePosition.setDeviceId(device.getDeviceId());
mobilePosition.setChannelId(channelId);
continue;
case "Time":
String timeVal = element.getStringValue();
if (ObjectUtils.isEmpty(timeVal)) {
mobilePosition.setTime(DateUtil.getNow());
} else {
mobilePosition.setTime(SipUtils.parseTime(timeVal));
}
continue;
case "Longitude":
mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
continue;
case "Latitude":
mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
continue;
case "Speed":
String speedVal = element.getStringValue();
if (NumericUtil.isDouble(speedVal)) {
mobilePosition.setSpeed(Double.parseDouble(speedVal));
} else {
mobilePosition.setSpeed(0.0);
}
continue;
case "Direction":
String directionVal = element.getStringValue();
if (NumericUtil.isDouble(directionVal)) {
mobilePosition.setDirection(Double.parseDouble(directionVal));
} else {
mobilePosition.setDirection(0.0);
}
continue;
case "Altitude":
String altitudeVal = element.getStringValue();
if (NumericUtil.isDouble(altitudeVal)) {
mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
} else {
mobilePosition.setAltitude(0.0);
}
continue;
Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getTextTrim().toString();
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) {
device = redisCatchStorage.getDevice(channelId);
if (device == null) {
// 根据通道id查询设备Id
List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
if (deviceList.size() > 0) {
device = deviceList.get(0);
}
} }
} }
if (device == null) {
logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
return;
}
// 兼容设备部分设备上报是通道编号与设备编号一致的情况
if (deviceId.equals(channelId)) {
List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
if (deviceChannels.size() == 1) {
channelId = deviceChannels.get(0).getChannelId();
}
}
if (!ObjectUtils.isEmpty(device.getName())) {
mobilePosition.setDeviceName(device.getName());
}
mobilePosition.setDeviceId(device.getDeviceId()); // logger.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
mobilePosition.setChannelId(channelId); // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
String time = XmlUtil.getText(rootElement, "Time");
if (ObjectUtils.isEmpty(time)) {
mobilePosition.setTime(DateUtil.getNow());
} else {
mobilePosition.setTime(SipUtils.parseTime(time));
}
mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
} else {
mobilePosition.setSpeed(0.0);
}
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
} else {
mobilePosition.setDirection(0.0);
}
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
} else {
mobilePosition.setAltitude(0.0);
}
logger.info("[收到移动位置订阅通知]{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude());
mobilePosition.setReportSource("Mobile Position"); mobilePosition.setReportSource("Mobile Position");
// 更新device channel 的经纬度 // 更新device channel 的经纬度
@ -149,13 +170,13 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime()); deviceChannel.setGpsTime(mobilePosition.getTime());
updateChannelMap.put(channelId, deviceChannel); updateChannelMap.put(deviceId + channelId, deviceChannel);
addMobilePositionList.add(mobilePosition); addMobilePositionList.add(mobilePosition);
if(updateChannelMap.size() > 300) { if(updateChannelMap.size() > 100) {
executeSaveChannel(); executeSaveChannel();
} }
if (userSetting.isSavePositionHistory()) { if (userSetting.isSavePositionHistory()) {
if(addMobilePositionList.size() > 300) { if(addMobilePositionList.size() > 100) {
executeSaveMobilePosition(); executeSaveMobilePosition();
} }
} }
@ -170,7 +191,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
// deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); // deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
if (!dynamicTask.contains(talkKey)) { if (!dynamicTask.contains(talkKey)) {
dynamicTask.startDelay(talkKey, this::executeSave, 1000); dynamicTask.startDelay(talkKey, this::executeSave, 3000);
} }
} catch (DocumentException e) { } catch (DocumentException e) {
@ -186,22 +207,24 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
dynamicTask.stop(talkKey); dynamicTask.stop(talkKey);
} }
private void executeSaveChannel(){ @Async("taskExecutor")
public void executeSaveChannel(){
dynamicTask.execute();
try { try {
logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); // ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
deviceChannelService.batchUpdateChannelGPS(deviceChannels); // deviceChannelService.batchUpdateChannelGPS(deviceChannels);
updateChannelMap.clear(); updateChannelMap.clear();
}catch (Exception e) { }catch (Exception e) {
} }
} }
@Async("taskExecutor")
private void executeSaveMobilePosition(){ public void executeSaveMobilePosition(){
if (userSetting.isSavePositionHistory()) { if (userSetting.isSavePositionHistory()) {
try { try {
logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); // logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
deviceChannelService.batchAddMobilePosition(addMobilePositionList); // deviceChannelService.batchAddMobilePosition(addMobilePositionList);
addMobilePositionList.clear(); addMobilePositionList.clear();
}catch (Exception e) { }catch (Exception e) {
logger.info("[移动位置订阅] b添加通道轨迹点位保存失败 {}", addMobilePositionList.size()); logger.info("[移动位置订阅] b添加通道轨迹点位保存失败 {}", addMobilePositionList.size());
@ -209,6 +232,4 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
} }
} }
} }

View File

@ -25,6 +25,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
@ -76,6 +78,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
@Autowired @Autowired
private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
@Autowired
private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor") @Qualifier("taskExecutor")
@ -105,10 +110,10 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
logger.error("未处理的异常 ", e); logger.error("未处理的异常 ", e);
} }
boolean runed = !taskQueue.isEmpty(); boolean runed = !taskQueue.isEmpty();
logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
taskQueue.offer(new HandlerCatchData(evt, null, null)); taskQueue.offer(new HandlerCatchData(evt, null, null));
if (!runed) { if (!runed) {
taskExecutor.execute(()-> { taskExecutor.execute(()-> {
// logger.warn("开始处理");
while (!taskQueue.isEmpty()) { while (!taskQueue.isEmpty()) {
try { try {
HandlerCatchData take = taskQueue.poll(); HandlerCatchData take = taskQueue.poll();
@ -129,8 +134,12 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
logger.info("接收到Alarm通知"); logger.info("接收到Alarm通知");
processNotifyAlarm(take.getEvt()); processNotifyAlarm(take.getEvt());
} else if (CmdType.MOBILE_POSITION.equals(cmd)) { } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition通知"); // logger.info("接收到MobilePosition通知");
processNotifyMobilePosition(take.getEvt()); // processNotifyMobilePosition(take.getEvt());
taskExecutor.execute(() -> {
notifyRequestForMobilePositionProcessor.process(take.getEvt());
});
} else { } else {
logger.info("接收到消息:" + cmd); logger.info("接收到消息:" + cmd);
} }
@ -147,11 +156,11 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
* *
* @param evt * @param evt
*/ */
private void processNotifyMobilePosition(RequestEvent evt) { @Async("taskExecutor")
public void processNotifyMobilePosition(RequestEvent evt) {
try { try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
// 回复 200 OK // 回复 200 OK
Element rootElement = getRootElement(evt); Element rootElement = getRootElement(evt);
if (rootElement == null) { if (rootElement == null) {
@ -360,4 +369,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
this.redisCatchStorage = redisCatchStorage; this.redisCatchStorage = redisCatchStorage;
} }
@Scheduled(fixedRate = 1000) //每1秒执行一次
public void execute(){
System.out.println("待处理消息数量: " + taskQueue.size());
}
} }

View File

@ -356,11 +356,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Override @Override
public void batchUpdateChannelGPS(List<DeviceChannel> channelList) { public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
channelMapper.batchUpdate(channelList);
} }
@Override @Override
public void batchAddMobilePosition(List<MobilePosition> mobilePositions) { public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
deviceMobilePositionMapper.batchadd(mobilePositions);
} }
} }

View File

@ -347,8 +347,8 @@ public interface DeviceChannelMapper {
"<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" + "<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" +
"<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" + "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
"<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" + "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
"<if test='customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" + "<if test='item.customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" +
"<if test='custom_latitude != null'>, custom_latitude=#{item.customLatitude}</if>" + "<if test='item.customLatitude != null'>, custom_latitude=#{item.customLatitude}</if>" +
"<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" + "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
"<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" + "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
"<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" + "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +

View File

@ -33,4 +33,19 @@ public interface DeviceMobilePositionMapper {
@Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}") @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
int clearMobilePositionsByDeviceId(String deviceId); int clearMobilePositionsByDeviceId(String deviceId);
@Insert("<script> " +
"insert into wvp_device_mobile_position " +
"(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
"longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
"values " +
"<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
"(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
"#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
"#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
"#{item.createTime}) " +
"</foreach> " +
"</script>")
void batchadd(List<MobilePosition> mobilePositions);
} }