优化报警推送

pull/1669/head
648540858 2024-10-28 17:53:27 +08:00
parent c25807f65d
commit bf9262283e
6 changed files with 423 additions and 287 deletions

View File

@ -39,7 +39,7 @@ public class VManageBootstrap extends SpringBootServletInitializer {
}else { }else {
log.info("构建版本: {}", gitUtil.getBuildVersion()); log.info("构建版本: {}", gitUtil.getBuildVersion());
log.info("构建时间: {}", gitUtil.getBuildDate()); log.info("构建时间: {}", gitUtil.getBuildDate());
log.info("GIT最后提交时间: {}", gitUtil.getCommitTime()); log.info("GIT信息: 分支: {}, ID: {}, 时间: {}", gitUtil.getBranch(), gitUtil.getCommitIdShort(), gitUtil.getCommitTime());
} }
} }
// 项目重启 // 项目重启

View File

@ -1,11 +1,16 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.HashSet;
import java.util.Set;
/** /**
* @author lin * @author lin
*/ */
@Schema(description = "报警信息") @Schema(description = "报警信息")
@Data
public class DeviceAlarm { public class DeviceAlarm {
/** /**
@ -32,6 +37,24 @@ public class DeviceAlarm {
@Schema(description = "报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情") @Schema(description = "报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情")
private String alarmPriority; private String alarmPriority;
@Schema(description = "报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情")
private String alarmPriorityDescription;
public String getAlarmPriorityDescription() {
switch (alarmPriority) {
case "1":
return "一级警情";
case "2":
return "二级警情";
case "3":
return "三级警情";
case "4":
return "四级警情";
default:
return alarmPriority;
}
}
/** /**
* , 1, 2, 3, 4 GPS, 5, 6, * , 1, 2, 3, 4 GPS, 5, 6,
* 7;12 - * 7;12 -
@ -40,6 +63,41 @@ public class DeviceAlarm {
"\t * 7其他报警;可以为直接组合如12为电话报警或设备报警") "\t * 7其他报警;可以为直接组合如12为电话报警或设备报警")
private String alarmMethod; private String alarmMethod;
private String alarmMethodDescription;
public String getAlarmMethodDescription() {
StringBuilder stringBuilder = new StringBuilder();
char[] charArray = alarmMethod.toCharArray();
for (char c : charArray) {
switch (c) {
case '1':
stringBuilder.append("-电话报警");
break;
case '2':
stringBuilder.append("-设备报警");
break;
case '3':
stringBuilder.append("-短信报警");
break;
case '4':
stringBuilder.append("-GPS报警");
break;
case '5':
stringBuilder.append("-视频报警");
break;
case '6':
stringBuilder.append("-设备故障报警");
break;
case '7':
stringBuilder.append("-其他报警");
break;
}
}
stringBuilder.delete(0, 1);
return stringBuilder.toString();
}
/** /**
* *
*/ */
@ -93,95 +151,122 @@ public class DeviceAlarm {
@Schema(description = "报警类型") @Schema(description = "报警类型")
private String alarmType; private String alarmType;
public String getAlarmTypeDescription() {
if (alarmType == null) {
return "";
}
char[] charArray = alarmMethod.toCharArray();
Set<String> alarmMethodSet = new HashSet<>();
for (char c : charArray) {
alarmMethodSet.add(Character.toString(c));
}
String result = alarmType;
if (alarmMethodSet.contains("2")) {
switch (alarmType) {
case "1":
result = "视频丢失报警";
break;
case "2":
result = "设备防拆报警";
break;
case "3":
result = "存储设备磁盘满报警";
break;
case "4":
result = "设备高温报警";
break;
case "5":
result = "设备低温报警";
break;
}
}
if (alarmMethodSet.contains("5")) {
switch (alarmType) {
case "1":
result = "人工视频报警";
break;
case "2":
result = "运动目标检测报警";
break;
case "3":
result = "遗留物检测报警";
break;
case "4":
result = "物体移除检测报警";
break;
case "5":
result = "绊线检测报警";
break;
case "6":
result = "入侵检测报警";
break;
case "7":
result = "逆行检测报警";
break;
case "8":
result = "徘徊检测报警";
break;
case "9":
result = "流量统计报警";
break;
case "10":
result = "密度检测报警";
break;
case "11":
result = "视频异常检测报警";
break;
case "12":
result = "快速移动报警";
break;
}
}
if (alarmMethodSet.contains("6")) {
switch (alarmType) {
case "1":
result = "人工视频报警";
break;
case "2":
result = "运动目标检测报警";
break;
case "3":
result = "遗留物检测报警";
break;
case "4":
result = "物体移除检测报警";
break;
case "5":
result = "绊线检测报警";
break;
case "6":
result = "入侵检测报警";
break;
case "7":
result = "逆行检测报警";
break;
case "8":
result = "徘徊检测报警";
break;
case "9":
result = "流量统计报警";
break;
case "10":
result = "密度检测报警";
break;
case "11":
result = "视频异常检测报警";
break;
case "12":
result = "快速移动报警";
break;
}
}
return result;
}
@Schema(description = "报警类型描述")
private String alarmTypeDescription;
@Schema(description = "创建时间") @Schema(description = "创建时间")
private String createTime; private String createTime;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getAlarmPriority() {
return alarmPriority;
}
public void setAlarmPriority(String alarmPriority) {
this.alarmPriority = alarmPriority;
}
public String getAlarmMethod() {
return alarmMethod;
}
public void setAlarmMethod(String alarmMethod) {
this.alarmMethod = alarmMethod;
}
public String getAlarmTime() {
return alarmTime;
}
public void setAlarmTime(String alarmTime) {
this.alarmTime = alarmTime;
}
public String getAlarmDescription() {
return alarmDescription;
}
public void setAlarmDescription(String alarmDescription) {
this.alarmDescription = alarmDescription;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public String getAlarmType() {
return alarmType;
}
public void setAlarmType(String alarmType) {
this.alarmType = alarmType;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
} }

View File

@ -0,0 +1,21 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
@Data
public class SSEMessage<T> {
private String event;
private T data;
public static SSEMessage<DeviceAlarm> getInstance(String event, DeviceAlarm data) {
SSEMessage<DeviceAlarm> message = new SSEMessage<>();
message.setEvent(event);
message.setData(data);
return message;
}
public String ecode(){
return String.format("event:%s\ndata:%s\n", event, JSONObject.toJSONString(data));
}
}

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event.alarm; package com.genersoft.iot.vmp.gb28181.event.alarm;
import com.genersoft.iot.vmp.gb28181.bean.SSEMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
@ -25,12 +26,12 @@ public class AlarmEventListener implements ApplicationListener<AlarmEvent> {
public void addSseEmitter(String browserId, PrintWriter writer) { public void addSseEmitter(String browserId, PrintWriter writer) {
SSE_CACHE.put(browserId, writer); SSE_CACHE.put(browserId, writer);
log.info("SSE 在线数: {}", SSE_CACHE.size()); log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, SSE_CACHE.size());
} }
public void removeSseEmitter(String browserId, PrintWriter writer) { public void removeSseEmitter(String browserId, PrintWriter writer) {
SSE_CACHE.remove(browserId, writer); SSE_CACHE.remove(browserId, writer);
log.info("SSE 在线数: {}", SSE_CACHE.size()); log.info("[SSE推送] 连接已断开, 浏览器 ID: {}, 当前在线数: {}", browserId, SSE_CACHE.size());
} }
@Override @Override
@ -39,14 +40,17 @@ public class AlarmEventListener implements ApplicationListener<AlarmEvent> {
log.debug("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription()); log.debug("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription());
} }
String msg = "<strong>设备编号:</strong> <i>" + event.getAlarmInfo().getDeviceId() + "</i>" log.info("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription());
String msg = "<strong>设备:</strong> <i>" + event.getAlarmInfo().getDeviceId() + "</i>"
+ "<br><strong>通道编号:</strong> <i>" + event.getAlarmInfo().getChannelId() + "</i>" + "<br><strong>通道编号:</strong> <i>" + event.getAlarmInfo().getChannelId() + "</i>"
+ "<br><strong>报警描述:</strong> <i>" + event.getAlarmInfo().getAlarmDescription() + "</i>" + "<br><strong>报警描述:</strong> <i>" + event.getAlarmInfo().getAlarmDescription() + "</i>"
+ "<br><strong>报警时间:</strong> <i>" + event.getAlarmInfo().getAlarmTime() + "</i>"; + "<br><strong>报警时间:</strong> <i>" + event.getAlarmInfo().getAlarmTime() + "</i>";
for (Iterator<Map.Entry<String, PrintWriter>> it = SSE_CACHE.entrySet().iterator(); it.hasNext(); ) { for (Iterator<Map.Entry<String, PrintWriter>> it = SSE_CACHE.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, PrintWriter> response = it.next(); Map.Entry<String, PrintWriter> response = it.next();
log.info("推送到 SSE 连接, 浏览器 ID: {}", response.getKey());
try { try {
PrintWriter writer = response.getValue(); PrintWriter writer = response.getValue();
@ -58,9 +62,13 @@ public class AlarmEventListener implements ApplicationListener<AlarmEvent> {
String sseMsg = "event:message\n" + String sseMsg = "event:message\n" +
"data:" + msg + "\n" + "data:" + msg + "\n" +
"\n"; "\n";
writer.write(sseMsg); System.out.println(
SSEMessage.getInstance("message", event.getAlarmInfo()).ecode()
);
writer.write(SSEMessage.getInstance("message", event.getAlarmInfo()).ecode());
writer.flush(); writer.flush();
} catch (Exception e) { } catch (Exception e) {
log.error("[发送SSE] 失败", e);
it.remove(); it.remove();
} }
} }

View File

@ -20,8 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element; import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
@ -30,6 +29,8 @@ import javax.sip.RequestEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@ -64,12 +65,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
@Autowired @Autowired
private IDeviceChannelService deviceChannelService; private IDeviceChannelService deviceChannelService;
private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
@ -78,25 +74,45 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
@Override @Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) { public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
boolean isEmpty = taskQueue.isEmpty(); if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[Alarm] 待处理消息队列已满 {}返回486 BUSY_HERE消息不做处理", userSetting.getMaxNotifyCountQueue());
return;
}
taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
}
@Scheduled(fixedDelay = 200)
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
}
List<SipMsgInfo> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
SipMsgInfo poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
for (SipMsgInfo sipMsgInfo : handlerCatchDataList) {
if (sipMsgInfo == null) {
continue;
}
RequestEvent evt = sipMsgInfo.getEvt();
System.out.println(evt.getRequest());
// 回复200 OK // 回复200 OK
try { try {
responseAck((SIPRequest) evt.getRequest(), Response.OK); responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 报警通知回复: {}", e.getMessage()); log.error("[命令发送失败] 报警通知回复: {}", e.getMessage());
} }
if (isEmpty) {
taskExecutor.execute(() -> {
if (log.isDebugEnabled()) {
log.info("[处理报警通知]待处理数量:{}", taskQueue.size() );
}
while (!taskQueue.isEmpty()) {
try { try {
SipMsgInfo sipMsgInfo = taskQueue.poll(); Device device = sipMsgInfo.getDevice();
Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
String channelId = deviceIdElement.getText().toString(); String channelId = deviceIdElement.getText();
DeviceAlarm deviceAlarm = new DeviceAlarm(); DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setCreateTime(DateUtil.getNow()); deviceAlarm.setCreateTime(DateUtil.getNow());
@ -184,13 +200,11 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) { if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
publisher.deviceAlarmEventPublish(deviceAlarm); publisher.deviceAlarmEventPublish(deviceAlarm);
} }
}catch (Exception e) { } catch (Exception e) {
log.error("未处理的异常 ", e); log.error("未处理的异常 ", e);
log.warn("[收到报警通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); log.warn("[收到报警通知] 发现未处理的异常, {}\r\n{}", e.getMessage(), evt.getRequest());
} }
} }
});
}
} }
@Override @Override
@ -203,7 +217,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
log.error("[命令发送失败] 国标级联 报警通知回复: {}", e.getMessage()); log.error("[命令发送失败] 国标级联 报警通知回复: {}", e.getMessage());
} }
Element deviceIdElement = rootElement.element("DeviceID"); Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getText().toString(); String channelId = deviceIdElement.getText();
DeviceAlarm deviceAlarm = new DeviceAlarm(); DeviceAlarm deviceAlarm = new DeviceAlarm();

View File

@ -112,17 +112,25 @@ export default {
let that = this; let that = this;
if (this.alarmNotify) { if (this.alarmNotify) {
console.log("申请SSE推送API调用浏览器ID: " + this.$browserId); console.log("申请SSE推送API调用浏览器ID: " + this.$browserId);
this.sseSource = new EventSource('/api/emit?browserId=' + this.$browserId); let url = (process.env.NODE_ENV === 'development' ? "debug" : "") + 'api/emit?browserId=' + this.$browserId
this.sseSource = new EventSource(url);
this.sseSource.addEventListener('message', function (evt) { this.sseSource.addEventListener('message', function (evt) {
console.log("收到信息:" + evt.data);
let data = JSON.parse(evt.data)
that.$notify({ that.$notify({
title: '报警信息', title: '报警信息',
dangerouslyUseHTMLString: true, dangerouslyUseHTMLString: true,
message: evt.data, message: `<strong>设备:</strong> <i> ${data.deviceId}</i>` +
`<br><strong>通道编号:</strong> <i>${ data.channelId}</i>` +
`<br><strong>报警级别:</strong> <i>${ data.alarmPriorityDescription}</i>` +
`<br><strong>报警方式:</strong> <i>${ data.alarmMethodDescription}</i>` +
`<br><strong>报警类型:</strong> <i>${ data.alarmTypeDescription}</i>` +
`<br><strong>报警时间:</strong> <i>${ data.alarmTime}</i>`,
type: 'warning', type: 'warning',
position: 'bottom-right', position: 'bottom-right',
duration: 3000 duration: 3000
}); });
console.log("收到信息:" + evt.data);
}); });
this.sseSource.addEventListener('open', function (e) { this.sseSource.addEventListener('open', function (e) {
console.log("SSE连接打开."); console.log("SSE连接打开.");