diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 162f9def..8381784e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -78,7 +78,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) { - log.info("[收到心跳] 设备{}地址变化, 远程地址为: {}:{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort()); + log.info("[收到心跳] 设备{}地址变化, {}:{}->{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort(), request.getLocalAddress().getHostAddress()); device.setPort(remoteAddressInfo.getPort()); device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort()))); device.setIp(remoteAddressInfo.getIp()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java index e31c65cb..d4f65e13 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -15,10 +15,9 @@ import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.utils.DateUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -26,6 +25,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; import javax.validation.constraints.NotNull; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -55,111 +55,119 @@ public class RedisAlarmMsgListener implements MessageListener { @Autowired private IPlatformService platformService; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private UserSetting userSetting; @Override public void onMessage(@NotNull Message message, byte[] bytes) { - log.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); + log.info("[REDIS: ALARM]: {}", new String(message.getBody())); taskQueue.offer(message); - if (isEmpty) { -// logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize()); - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); - if (alarmChannelMessage == null) { - log.warn("[REDIS的ALARM通知]消息解析失败"); - continue; - } - String gbId = alarmChannelMessage.getGbId(); + } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setCreateTime(DateUtil.getNow()); - deviceAlarm.setChannelId(gbId); - deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); - deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); - deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType()); - deviceAlarm.setAlarmPriority("1"); - deviceAlarm.setAlarmTime(DateUtil.getNow()); - deviceAlarm.setLongitude(0); - deviceAlarm.setLatitude(0); + @Scheduled(fixedDelay = 100) + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List messageDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + Message msg = taskQueue.poll(); + if (msg != null) { + messageDataList.add(msg); + } + } + if (messageDataList.isEmpty()) { + return; + } + for (Message msg : messageDataList) { + try { + AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); + if (alarmChannelMessage == null) { + log.warn("[REDIS的ALARM通知]消息解析失败"); + continue; + } + String gbId = alarmChannelMessage.getGbId(); - if (ObjectUtils.isEmpty(gbId)) { - if (userSetting.getSendToPlatformsWhenIdLost()) { - // 发送给所有的上级 - List parentPlatforms = platformService.queryEnablePlatformList(); - if (!parentPlatforms.isEmpty()) { - for (Platform parentPlatform : parentPlatforms) { - try { - deviceAlarm.setChannelId(parentPlatform.getDeviceGBId()); - commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); - } - } - } - }else { - // 获取开启了消息推送的设备和平台 - List parentPlatforms = mobilePositionService.queryEnablePlatformListWithAsMessageChannel(); - if (parentPlatforms.size() > 0) { - for (Platform parentPlatform : parentPlatforms) { - try { - deviceAlarm.setChannelId(parentPlatform.getDeviceGBId()); - commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); - } - } - } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setCreateTime(DateUtil.getNow()); + deviceAlarm.setChannelId(gbId); + deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); + deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); + deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType()); + deviceAlarm.setAlarmPriority("1"); + deviceAlarm.setAlarmTime(DateUtil.getNow()); + deviceAlarm.setLongitude(0); + deviceAlarm.setLatitude(0); - } - // 获取开启了消息推送的设备和平台 - List devices = channelService.queryDeviceWithAsMessageChannel(); - if (devices.size() > 0) { - for (Device device : devices) { - try { - deviceAlarm.setChannelId(device.getDeviceId()); - commander.sendAlarmMessage(device, deviceAlarm); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 发送报警: {}", e.getMessage()); - } - } - } - - }else { - Device device = deviceService.getDeviceByDeviceId(gbId); - Platform platform = platformService.queryPlatformByServerGBId(gbId); - if (device != null && platform == null) { + if (ObjectUtils.isEmpty(gbId)) { + if (userSetting.getSendToPlatformsWhenIdLost()) { + // 发送给所有的上级 + List parentPlatforms = platformService.queryEnablePlatformList(); + if (!parentPlatforms.isEmpty()) { + for (Platform parentPlatform : parentPlatforms) { try { - commander.sendAlarmMessage(device, deviceAlarm); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 发送报警: {}", e.getMessage()); + deviceAlarm.setChannelId(parentPlatform.getDeviceGBId()); + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); } - }else if (device == null && platform != null){ - try { - commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 发送报警: {}", e.getMessage()); - } - }else { - log.warn("无法确定" + gbId + "是平台还是设备"); } } - }catch (Exception e) { - log.error("未处理的异常 ", e); - log.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); + } else { + // 获取开启了消息推送的设备和平台 + List parentPlatforms = mobilePositionService.queryEnablePlatformListWithAsMessageChannel(); + if (!parentPlatforms.isEmpty()) { + for (Platform parentPlatform : parentPlatforms) { + try { + deviceAlarm.setChannelId(parentPlatform.getDeviceGBId()); + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); + } + } + } + + } + // 获取开启了消息推送的设备和平台 + List devices = channelService.queryDeviceWithAsMessageChannel(); + if (!devices.isEmpty()) { + for (Device device : devices) { + try { + deviceAlarm.setChannelId(device.getDeviceId()); + commander.sendAlarmMessage(device, deviceAlarm); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 发送报警: {}", e.getMessage()); + } + } + } + + } else { + Device device = deviceService.getDeviceByDeviceId(gbId); + Platform platform = platformService.queryPlatformByServerGBId(gbId); + if (device != null && platform == null) { + try { + commander.sendAlarmMessage(device, deviceAlarm); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 发送报警: {}", e.getMessage()); + } + } else if (device == null && platform != null) { + try { + commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 发送报警: {}", e.getMessage()); + } + } else { + log.warn("无法确定" + gbId + "是平台还是设备"); } } - }); + } catch (Exception e) { + log.error("未处理的异常 ", e); + log.warn("[REDIS的ALARM通知] 发现未处理的异常, {}", e.getMessage()); + } } } } + diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java index e0224839..f0ba9425 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java @@ -6,12 +6,13 @@ import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -26,31 +27,40 @@ public class RedisCloseStreamMsgListener implements MessageListener { @Autowired private IStreamPushService pushService; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Override public void onMessage(@NotNull Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); + log.info("[REDIS: 关闭流]: {}", new String(message.getBody())); taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - JSONObject jsonObject = JSON.parseObject(msg.getBody()); - String app = jsonObject.getString("app"); - String stream = jsonObject.getString("stream"); - pushService.stopByAppAndStream(app, stream); - }catch (Exception e) { - log.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - log.error("[REDIS的关闭推流通知] 异常内容: ", e); - } - } - }); + } + + @Scheduled(fixedDelay = 100) + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List messageDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + Message msg = taskQueue.poll(); + if (msg != null) { + messageDataList.add(msg); + } + } + if (messageDataList.isEmpty()) { + return; + } + for (Message msg : messageDataList) { + try { + JSONObject jsonObject = JSON.parseObject(msg.getBody()); + String app = jsonObject.getString("app"); + String stream = jsonObject.getString("stream"); + pushService.stopByAppAndStream(app, stream); + }catch (Exception e) { + log.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(msg)); + log.error("[REDIS的关闭推流通知] 异常内容: ", e); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index fd304222..bd34dd6f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -33,11 +33,12 @@ public class RedisGpsMsgListener implements MessageListener { @Autowired private IStreamPushService streamPushService; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Override public void onMessage(@NotNull Message message, byte[] bytes) { + log.debug("[REDIS: GPS]: {}", new String(message.getBody())); taskQueue.offer(message); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java index 1c2845a4..ca704828 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java @@ -8,11 +8,9 @@ import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -38,86 +36,93 @@ public class RedisPushStreamListMsgListener implements MessageListener { @Resource private IStreamPushService streamPushService; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Override public void onMessage(Message message, byte[] bytes) { - log.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); + log.info("[REDIS: 流设备列表更新]: {}", new String(message.getBody())); taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - List streamPushItems = JSON.parseArray(new String(msg.getBody()), RedisPushStreamMessage.class); - //查询全部的app+stream 用于判断是添加还是修改 - Map allAppAndStream = streamPushService.getAllAppAndStreamMap(); - Map allGBId = streamPushService.getAllGBId(); + } - /** - * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 - */ - List streamPushItemForSave = new ArrayList<>(); - List streamPushItemForUpdate = new ArrayList<>(); - for (RedisPushStreamMessage pushStreamMessage : streamPushItems) { - String app = pushStreamMessage.getApp(); - String stream = pushStreamMessage.getStream(); - boolean contains = allAppAndStream.containsKey(app + stream); - //不存在就添加 - if (!contains) { - if (allGBId.containsKey(pushStreamMessage.getGbId())) { - StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId()); - log.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}", - streamPushInDb.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream()); - continue; - } - StreamPush streamPush = pushStreamMessage.buildstreamPush(); - streamPush.setCreateTime(DateUtil.getNow()); - streamPush.setUpdateTime(DateUtil.getNow()); - streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); - streamPushItemForSave.add(streamPush); - allGBId.put(streamPush.getGbDeviceId(), streamPush); - } else { - StreamPush streamPushForGbDeviceId = allGBId.get(pushStreamMessage.getGbId()); - if (streamPushForGbDeviceId != null - && (!streamPushForGbDeviceId.getApp().equals(pushStreamMessage.getApp()) - || !streamPushForGbDeviceId.getStream().equals(pushStreamMessage.getStream()))) { - StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId()); - log.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}", - pushStreamMessage.getGbId(), streamPushInDb.getApp(), streamPushInDb.getStream()); - continue; - } - StreamPush streamPush = allAppAndStream.get(app + stream); - streamPush.setUpdateTime(DateUtil.getNow()); - streamPush.setGbDeviceId(pushStreamMessage.getGbId()); - streamPush.setGbName(pushStreamMessage.getName()); - streamPush.setGbStatus(pushStreamMessage.isStatus()?"ON":"OFF"); - //存在就只修改 name和gbId - streamPushItemForUpdate.add(streamPush); - } - } - if (!streamPushItemForSave.isEmpty()) { - log.info("添加{}条",streamPushItemForSave.size()); - log.info(JSONObject.toJSONString(streamPushItemForSave)); - streamPushService.batchAdd(streamPushItemForSave); + @Scheduled(fixedDelay = 100) + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List messageDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + Message msg = taskQueue.poll(); + if (msg != null) { + messageDataList.add(msg); + } + } + if (messageDataList.isEmpty()) { + return; + } + for (Message msg : messageDataList) { + try { + List streamPushItems = JSON.parseArray(new String(msg.getBody()), RedisPushStreamMessage.class); + //查询全部的app+stream 用于判断是添加还是修改 + Map allAppAndStream = streamPushService.getAllAppAndStreamMap(); + Map allGBId = streamPushService.getAllGBId(); + // 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 + List streamPushItemForSave = new ArrayList<>(); + List streamPushItemForUpdate = new ArrayList<>(); + for (RedisPushStreamMessage pushStreamMessage : streamPushItems) { + String app = pushStreamMessage.getApp(); + String stream = pushStreamMessage.getStream(); + boolean contains = allAppAndStream.containsKey(app + stream); + //不存在就添加 + if (!contains) { + if (allGBId.containsKey(pushStreamMessage.getGbId())) { + StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId()); + log.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}", + streamPushInDb.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream()); + continue; } - if(!streamPushItemForUpdate.isEmpty()){ - log.info("修改{}条",streamPushItemForUpdate.size()); - log.info(JSONObject.toJSONString(streamPushItemForUpdate)); - streamPushService.batchUpdate(streamPushItemForUpdate); + StreamPush streamPush = pushStreamMessage.buildstreamPush(); + streamPush.setCreateTime(DateUtil.getNow()); + streamPush.setUpdateTime(DateUtil.getNow()); + streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); + streamPushItemForSave.add(streamPush); + allGBId.put(streamPush.getGbDeviceId(), streamPush); + } else { + StreamPush streamPushForGbDeviceId = allGBId.get(pushStreamMessage.getGbId()); + if (streamPushForGbDeviceId != null + && (!streamPushForGbDeviceId.getApp().equals(pushStreamMessage.getApp()) + || !streamPushForGbDeviceId.getStream().equals(pushStreamMessage.getStream()))) { + StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId()); + log.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}", + pushStreamMessage.getGbId(), streamPushInDb.getApp(), streamPushInDb.getStream()); + continue; } - }catch (Exception e) { - log.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody())); - log.error("[REDIS消息-推流设备列表更新] 异常内容: ", e); + StreamPush streamPush = allAppAndStream.get(app + stream); + streamPush.setUpdateTime(DateUtil.getNow()); + streamPush.setGbDeviceId(pushStreamMessage.getGbId()); + streamPush.setGbName(pushStreamMessage.getName()); + streamPush.setGbStatus(pushStreamMessage.isStatus() ? "ON" : "OFF"); + //存在就只修改 name和gbId + streamPushItemForUpdate.add(streamPush); } } - }); + if (!streamPushItemForSave.isEmpty()) { + log.info("添加{}条", streamPushItemForSave.size()); + log.info(JSONObject.toJSONString(streamPushItemForSave)); + streamPushService.batchAdd(streamPushItemForSave); + + } + if (!streamPushItemForUpdate.isEmpty()) { + log.info("修改{}条", streamPushItemForUpdate.size()); + log.info(JSONObject.toJSONString(streamPushItemForUpdate)); + streamPushService.batchUpdate(streamPushItemForUpdate); + } + } catch (Exception e) { + log.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(msg.getBody())); + log.error("[REDIS消息-推流设备列表更新] 异常内容: ", e); + } } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java index 47c93ecf..f8a78dca 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java @@ -7,16 +7,20 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** * 接收redis返回的推流结果 + * * @author lin * PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":0,"msg":"失败","app":"1000","stream":"10000022"}' */ @@ -30,37 +34,49 @@ public class RedisPushStreamResponseListener implements MessageListener { @Autowired private ThreadPoolTaskExecutor taskExecutor; - private Map responseEvents = new ConcurrentHashMap<>(); + private final Map responseEvents = new ConcurrentHashMap<>(); - public interface PushStreamResponseEvent{ + public interface PushStreamResponseEvent { void run(MessageForPushChannelResponse response); } @Override public void onMessage(Message message, byte[] bytes) { - log.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); + log.info("[REDIS: 推流结果]: {}", new String(message.getBody())); taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ - log.info("[REDIS消息-请求推流结果]:参数不全"); - continue; - } - // 查看正在等待的invite消息 - if (responseEvents.get(response.getApp() + response.getStream()) != null) { - responseEvents.get(response.getApp() + response.getStream()).run(response); - } - }catch (Exception e) { - log.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - log.error("[REDIS消息-请求推流结果] 异常内容: ", e); - } + } + + @Scheduled(fixedDelay = 100) + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List messageDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + Message msg = taskQueue.poll(); + if (msg != null) { + messageDataList.add(msg); + } + } + if (messageDataList.isEmpty()) { + return; + } + for (Message msg : messageDataList) { + try { + MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())) { + log.info("[REDIS消息-请求推流结果]:参数不全"); + continue; } - }); + // 查看正在等待的invite消息 + if (responseEvents.get(response.getApp() + response.getStream()) != null) { + responseEvents.get(response.getApp() + response.getStream()).run(response); + } + } catch (Exception e) { + log.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(msg)); + log.error("[REDIS消息-请求推流结果] 异常内容: ", e); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java index 428f743f..2b11cbf3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java @@ -9,19 +9,21 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; /** * 接收redis发送的推流设备上线下线通知 + * * @author lin * 发送 PUBLISH VM_MSG_PUSH_STREAM_STATUS_CHANGE '{"setAllOffline":false,"offlineStreams":[{"app":"1000","stream":"10000022","timeStamp":1726729716551}]}' * 订阅 SUBSCRIBE VM_MSG_PUSH_STREAM_STATUS_CHANGE @@ -44,48 +46,55 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - @Override public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - log.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody())); + log.info("[REDIS: 流设备状态变化]: {}", new String(message.getBody())); taskQueue.offer(message); + } - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - PushStreamStatusChangeFromRedisDto streamStatusMessage = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); - if (streamStatusMessage == null) { - log.warn("[REDIS消息]推流设备状态变化消息解析失败"); - continue; - } - // 取消定时任务 - dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); - if (streamStatusMessage.isSetAllOffline()) { - // 所有设备离线 - streamPushService.allOffline(); - } - if (streamStatusMessage.getOfflineStreams() != null - && !streamStatusMessage.getOfflineStreams().isEmpty()) { - // 更新部分设备离线 - streamPushService.offline(streamStatusMessage.getOfflineStreams()); - } - if (streamStatusMessage.getOnlineStreams() != null && - !streamStatusMessage.getOnlineStreams().isEmpty()) { - // 更新部分设备上线 - streamPushService.online(streamStatusMessage.getOnlineStreams()); - } - }catch (Exception e) { - log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - log.error("[REDIS消息-推流设备状态变化] 异常内容: ", e); - } + @Scheduled(fixedDelay = 100) + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List messageDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + Message msg = taskQueue.poll(); + if (msg != null) { + messageDataList.add(msg); + } + } + if (messageDataList.isEmpty()) { + return; + } + for (Message msg : messageDataList) { + try { + PushStreamStatusChangeFromRedisDto streamStatusMessage = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); + if (streamStatusMessage == null) { + log.warn("[REDIS消息]推流设备状态变化消息解析失败"); + continue; } - }); + // 取消定时任务 + dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); + if (streamStatusMessage.isSetAllOffline()) { + // 所有设备离线 + streamPushService.allOffline(); + } + if (streamStatusMessage.getOfflineStreams() != null + && !streamStatusMessage.getOfflineStreams().isEmpty()) { + // 更新部分设备离线 + streamPushService.offline(streamStatusMessage.getOfflineStreams()); + } + if (streamStatusMessage.getOnlineStreams() != null && + !streamStatusMessage.getOnlineStreams().isEmpty()) { + // 更新部分设备上线 + streamPushService.online(streamStatusMessage.getOnlineStreams()); + } + } catch (Exception e) { + log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(msg)); + log.error("[REDIS消息-推流设备状态变化] 异常内容: ", e); + } } } @@ -94,7 +103,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic if (!userSetting.isUsePushingAsStatus()) { // 启动时设置所有推流通道离线,发起查询请求 redisCatchStorage.sendStreamPushRequestedMsgForStatus(); - dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{ + dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, () -> { log.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线"); // 五秒收不到请求就设置通道离线,然后通知上级离线 streamPushService.allOffline();