优化notify消息处理

2.7.1
648540858 2024-09-26 12:39:34 +08:00
parent 0c865d1210
commit 2a2a3943f4
5 changed files with 102 additions and 57 deletions

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
@ -42,8 +41,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>(); private final List<DeviceChannel> updateChannelForStatusChange = new CopyOnWriteArrayList<>();
private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>(); // private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
@ -63,9 +62,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
@Autowired @Autowired
private IDeviceChannelService deviceChannelService; private IDeviceChannelService deviceChannelService;
@Autowired
private DynamicTask dynamicTask;
@Autowired @Autowired
private SipConfig sipConfig; private SipConfig sipConfig;
@ -77,13 +73,24 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
} }
taskQueue.offer(new HandlerCatchData(evt, null, null)); taskQueue.offer(new HandlerCatchData(evt, null, null));
} }
// @Scheduled(fixedRate = 2000) //每400毫秒执行一次
// public void showSize(){
// logger.warn("[notify-目录订阅] 待处理消息数量: {}", taskQueue.size() );
// }
@Scheduled(fixedRate = 400) //每400毫秒执行一次 @Scheduled(fixedRate = 400) //每400毫秒执行一次
public void executeTaskQueue(){ public void executeTaskQueue(){
if (taskQueue.isEmpty()) { if (taskQueue.isEmpty()) {
return; return;
} }
for (HandlerCatchData take : taskQueue) { List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
while (!taskQueue.isEmpty()) {
handlerCatchDataList.add(taskQueue.poll());
}
if (handlerCatchDataList.isEmpty()) {
return;
}
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) { if (take == null) {
continue; continue;
} }
@ -130,12 +137,15 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channel.setParentId(null); channel.setParentId(null);
} }
channel.setDeviceId(device.getDeviceId()); channel.setDeviceId(device.getDeviceId());
logger.info("[收到目录订阅]{}/{}", device.getDeviceId(), channel.getChannelId()); if (logger.isDebugEnabled()){
logger.debug("[收到目录订阅]{}/{}", device.getDeviceId(), channel.getChannelId());
}
switch (event) { switch (event) {
case CatalogEvent.ON: case CatalogEvent.ON:
// 上线 // 上线
logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
updateChannelOnlineList.add(channel); channel.setStatus(true);
updateChannelForStatusChange.add(channel);
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
@ -147,7 +157,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
if (userSetting.getRefuseChannelStatusChannelFormNotify()) { if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
} else { } else {
updateChannelOfflineList.add(channel); channel.setStatus(false);
updateChannelForStatusChange.add(channel);
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
@ -160,7 +171,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
if (userSetting.getRefuseChannelStatusChannelFormNotify()) { if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
} else { } else {
updateChannelOfflineList.add(channel); channel.setStatus(false);
updateChannelForStatusChange.add(channel);
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
@ -173,7 +185,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
if (userSetting.getRefuseChannelStatusChannelFormNotify()) { if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
} else { } else {
updateChannelOfflineList.add(channel); channel.setStatus(false);
updateChannelForStatusChange.add(channel);
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
@ -211,6 +224,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
case CatalogEvent.UPDATE: case CatalogEvent.UPDATE:
// 更新 // 更新
logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
// 判断此通道是否存在 // 判断此通道是否存在
DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
if (deviceChannelForUpdate != null) { if (deviceChannelForUpdate != null) {
@ -242,8 +256,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
taskQueue.clear(); taskQueue.clear();
if (!updateChannelMap.keySet().isEmpty() if (!updateChannelMap.keySet().isEmpty()
|| !addChannelMap.keySet().isEmpty() || !addChannelMap.keySet().isEmpty()
|| !updateChannelOnlineList.isEmpty() || !updateChannelForStatusChange.isEmpty()
|| !updateChannelOfflineList.isEmpty()
|| !deleteChannelList.isEmpty()) { || !deleteChannelList.isEmpty()) {
executeSave(); executeSave();
} }
@ -256,14 +269,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
logger.error("[存储收到的增加通道] 异常: ", e ); logger.error("[存储收到的增加通道] 异常: ", e );
} }
try { try {
executeSaveForOnline(); executeSaveForStatus();
} catch (Exception e) { } catch (Exception e) {
logger.error("[存储收到的通道上线] 异常: ", e ); logger.error("[存储收到的通道状态变化] 异常: ", e );
}
try {
executeSaveForOffline();
} catch (Exception e) {
logger.error("[存储收到的通道离线] 异常: ", e );
} }
try { try {
executeSaveForUpdate(); executeSaveForUpdate();
@ -301,22 +309,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
} }
} }
private void executeSaveForOnline(){ private void executeSaveForStatus(){
if (!updateChannelOnlineList.isEmpty()) { if (!updateChannelForStatusChange.isEmpty()) {
deviceChannelService.channelsOnline(updateChannelOnlineList); deviceChannelService.updateChannelsStaus(updateChannelForStatusChange);
updateChannelOnlineList.clear(); updateChannelForStatusChange.clear();
} }
} }
private void executeSaveForOffline(){
if (!updateChannelOfflineList.isEmpty()) {
deviceChannelService.channelsOffline(updateChannelOfflineList);
updateChannelOfflineList.clear();
}
}
// @Scheduled(fixedRate = 10000) //每1秒执行一次
// public void execute(){
// logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size());
// }
} }

View File

@ -25,6 +25,7 @@ import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -68,7 +69,14 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
if (taskQueue.isEmpty()) { if (taskQueue.isEmpty()) {
return; return;
} }
for (HandlerCatchData take : taskQueue) { List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
while (!taskQueue.isEmpty()) {
handlerCatchDataList.add(taskQueue.poll());
}
if (handlerCatchDataList.isEmpty()) {
return;
}
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) { if (take == null) {
continue; continue;
} }

View File

@ -67,12 +67,7 @@ public interface IDeviceChannelService {
/** /**
* 线 * 线
*/ */
int channelsOnline(List<DeviceChannel> channels); int updateChannelsStaus(List<DeviceChannel> channels);
/**
* 线
*/
int channelsOffline(List<DeviceChannel> channels);
/** /**
* *

View File

@ -239,13 +239,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
} }
@Override @Override
public int deleteChannels(List<DeviceChannel> deleteChannelList) { @Transactional
return channelMapper.batchDel(deleteChannelList); public int deleteChannels(List<DeviceChannel> channels) {
int limitCount = 1000;
int result = 0;
if (!channels.isEmpty()) {
if (channels.size() > limitCount) {
for (int i = 0; i < channels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channels.size()) {
toIndex = channels.size();
}
result += channelMapper.batchDel(channels.subList(i, toIndex));
}
}else {
result += channelMapper.batchDel(channels);
}
}
return result;
} }
@Override @Override
public int channelsOnline(List<DeviceChannel> channels) { @Transactional
return channelMapper.batchOnline(channels); public int updateChannelsStaus(List<DeviceChannel> channels) {
int limitCount = 1000;
int result = 0;
if (!channels.isEmpty()) {
if (channels.size() > limitCount) {
for (int i = 0; i < channels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channels.size()) {
toIndex = channels.size();
}
result += channelMapper.batchUpdateStatus(channels.subList(i, toIndex));
}
}else {
result += channelMapper.batchUpdateStatus(channels);
}
}
return result;
} }
@Override @Override
@ -253,12 +285,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
channelMapper.online(channel.getDeviceId(), channel.getChannelId()); channelMapper.online(channel.getDeviceId(), channel.getChannelId());
} }
@Override
public int channelsOffline(List<DeviceChannel> channels) {
return channelMapper.batchOffline(channels);
}
@Override @Override
public void offline(DeviceChannel channel) { public void offline(DeviceChannel channel) {
channelMapper.offline(channel.getDeviceId(), channel.getChannelId()); channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
@ -275,6 +301,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
} }
@Override @Override
@Transactional
public synchronized void batchUpdateChannel(List<DeviceChannel> channels) { public synchronized void batchUpdateChannel(List<DeviceChannel> channels) {
String now = DateUtil.getNow(); String now = DateUtil.getNow();
for (DeviceChannel channel : channels) { for (DeviceChannel channel : channels) {
@ -297,8 +324,27 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
} }
@Override @Override
@Transactional
public void batchAddChannel(List<DeviceChannel> channels) { public void batchAddChannel(List<DeviceChannel> channels) {
channelMapper.batchAdd(channels); String now = DateUtil.getNow();
for (DeviceChannel channel : channels) {
channel.setUpdateTime(now);
channel.setCreateTime(now);
}
int limitCount = 1000;
if (!channels.isEmpty()) {
if (channels.size() > limitCount) {
for (int i = 0; i < channels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channels.size()) {
toIndex = channels.size();
}
channelMapper.batchAdd(channels.subList(i, toIndex));
}
}else {
channelMapper.batchAdd(channels);
}
}
for (DeviceChannel channel : channels) { for (DeviceChannel channel : channels) {
if (channel.getParentId() != null) { if (channel.getParentId() != null) {
channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId());

View File

@ -479,10 +479,10 @@ public interface DeviceChannelMapper {
@Update({"<script>" + @Update({"<script>" +
"<foreach collection='channels' item='item' separator=';'>" + "<foreach collection='channels' item='item' separator=';'>" +
"UPDATE wvp_device_channel SET status=true WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}" + "UPDATE wvp_device_channel SET status=#{item.status} WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}" +
"</foreach>" + "</foreach>" +
"</script>"}) "</script>"})
int batchOnline(@Param("channels") List<DeviceChannel> channels); int batchUpdateStatus(@Param("channels") List<DeviceChannel> channels);
@Update({"<script>" + @Update({"<script>" +
"<foreach collection='channels' item='item' separator=';'>" + "<foreach collection='channels' item='item' separator=';'>" +