From 0f31ce7ecf0b872f8cf8cc83df66886fa8488140 Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Thu, 15 Dec 2022 14:36:17 +0800 Subject: [PATCH 1/7] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/config/YudaoMQAutoConfiguration.java | 11 +++- .../mq/scheduler/PendingMessageScheduler.java | 66 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index c369d49d6..9c70a1246 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -8,6 +8,7 @@ import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import cn.iocoder.yudao.framework.mq.scheduler.PendingMessageScheduler; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -24,7 +25,6 @@ import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX; import org.springframework.data.redis.stream.StreamMessageListenerContainer; -import org.springframework.scheduling.annotation.Async; import java.util.List; import java.util.Properties; @@ -69,6 +69,15 @@ public class YudaoMQAutoConfiguration { return container; } + /** + * + * @return + */ + @Bean + public PendingMessageScheduler pendingMessageScheduler(){ + return new PendingMessageScheduler(); + } + /** * 创建 Redis Stream 集群消费的容器 * diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java new file mode 100644 index 000000000..38d8df3bb --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java @@ -0,0 +1,66 @@ +package cn.iocoder.yudao.framework.mq.scheduler; + +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.PendingMessagesSummary; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.core.StreamOperations; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.List; +import java.util.Map; + +/** + * 这个定时器用于处理,crash 之后的消费者未消费完的消息 + */ +@Slf4j +@EnableScheduling +public class PendingMessageScheduler { + + @Autowired + private List> listeners; + @Autowired + private RedisMQTemplate redisTemplate; + @Value("${spring.application.name}") + private String groupName; + + /** + * 一分钟执行一次 + */ + @Scheduled(fixedRate = 60 * 1000) + public void processPendingMessage() { + StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); + + for (AbstractStreamMessageListener listener : listeners) { + PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); + // 每个消费者的pending消息数量 + Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); + pendingMessagesPerConsumer.entrySet().forEach(entry -> { + String consumerName = entry.getKey(); + Long pendingMessageCount = entry.getValue(); + log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); + + // 从消费者的pending队列中读取消息 + List> retVal = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); + + for (MapRecord record : retVal) { + // 重新投递消息 + redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() + .ofObject(record.getValue()) // 设置内容 + .withStreamKey(listener.getStreamKey())); + + // ack 消息消费完成 + redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); + } + }); + } + } +} From a6c92816f0fb5801c27f6dc74cc0ac9542622b86 Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Thu, 15 Dec 2022 15:33:15 +0800 Subject: [PATCH 2/7] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/scheduler/PendingMessageScheduler.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java index 38d8df3bb..1d5825a10 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java @@ -3,6 +3,8 @@ package cn.iocoder.yudao.framework.mq.scheduler; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.stream.Consumer; @@ -17,6 +19,7 @@ import org.springframework.scheduling.annotation.Scheduled; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * 这个定时器用于处理,crash 之后的消费者未消费完的消息 @@ -24,19 +27,35 @@ import java.util.Map; @Slf4j @EnableScheduling public class PendingMessageScheduler { - + private static final String LOCK_KEY = "redis:pending:msg:lock"; @Autowired private List> listeners; @Autowired private RedisMQTemplate redisTemplate; @Value("${spring.application.name}") private String groupName; + @Autowired + private RedissonClient redissonClient; /** * 一分钟执行一次 */ @Scheduled(fixedRate = 60 * 1000) public void processPendingMessage() { + final RLock lock = redissonClient.getLock(LOCK_KEY); + try { + // 尝试加锁,最多等待 30 秒,上锁以后 60 秒自动解锁 + boolean lockFlag = lock.tryLock(30, 60, TimeUnit.SECONDS); + if (lockFlag) { + execute(); + } + } catch (InterruptedException e) { + log.error("获取锁失败", e); + } + + } + + private void execute() { StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); for (AbstractStreamMessageListener listener : listeners) { From 46f3790492492bfaee83cf33039a445de9787b6d Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Fri, 23 Dec 2022 10:28:59 +0800 Subject: [PATCH 3/7] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/config/YudaoMQAutoConfiguration.java | 21 ++++-- ...java => RedisPendingMessageResendJob.java} | 74 ++++++++++--------- 2 files changed, 52 insertions(+), 43 deletions(-) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/{PendingMessageScheduler.java => RedisPendingMessageResendJob.java} (52%) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index 9c70a1246..e306389be 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -8,9 +8,11 @@ import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; -import cn.iocoder.yudao.framework.mq.scheduler.PendingMessageScheduler; +import cn.iocoder.yudao.framework.mq.scheduler.RedisPendingMessageResendJob; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisServerCommands; @@ -25,6 +27,7 @@ import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX; import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.scheduling.annotation.EnableScheduling; import java.util.List; import java.util.Properties; @@ -35,6 +38,7 @@ import java.util.Properties; * @author 芋道源码 */ @Slf4j +@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) public class YudaoMQAutoConfiguration { @@ -70,17 +74,19 @@ public class YudaoMQAutoConfiguration { } /** - * - * @return + * 创建 Redis Stream 重新消费的任务 */ @Bean - public PendingMessageScheduler pendingMessageScheduler(){ - return new PendingMessageScheduler(); + public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, + RedisMQTemplate redisTemplate, + @Value("${spring.application.name}") String groupName, + RedissonClient redissonClient) { + return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); } /** * 创建 Redis Stream 集群消费的容器 - * + *

* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html */ @Bean(initMethod = "start", destroyMethod = "stop") @@ -108,7 +114,8 @@ public class YudaoMQAutoConfiguration { // 创建 listener 对应的消费者分组 try { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); - } catch (Exception ignore) {} + } catch (Exception ignore) { + } // 设置 listener 对应的 redisTemplate listener.setRedisMQTemplate(redisMQTemplate); // 创建 Consumer 对象 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java similarity index 52% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java index 1d5825a10..2f64ab498 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java @@ -1,12 +1,11 @@ package cn.iocoder.yudao.framework.mq.scheduler; +import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.PendingMessagesSummary; @@ -14,51 +13,53 @@ import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.StreamOperations; -import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; /** - * 这个定时器用于处理,crash 之后的消费者未消费完的消息 + * 这个任务用于处理,crash 之后的消费者未消费完的消息 */ @Slf4j -@EnableScheduling -public class PendingMessageScheduler { +public class RedisPendingMessageResendJob { private static final String LOCK_KEY = "redis:pending:msg:lock"; - @Autowired - private List> listeners; - @Autowired - private RedisMQTemplate redisTemplate; - @Value("${spring.application.name}") - private String groupName; - @Autowired - private RedissonClient redissonClient; + + private final List> listeners; + private final RedisMQTemplate redisTemplate; + private final String groupName; + private final RedissonClient redissonClient; + + public RedisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient) { + this.listeners = listeners; + this.redisTemplate = redisTemplate; + this.groupName = groupName; + this.redissonClient = redissonClient; + } /** - * 一分钟执行一次 + * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题 */ - @Scheduled(fixedRate = 60 * 1000) - public void processPendingMessage() { - final RLock lock = redissonClient.getLock(LOCK_KEY); - try { - // 尝试加锁,最多等待 30 秒,上锁以后 60 秒自动解锁 - boolean lockFlag = lock.tryLock(30, 60, TimeUnit.SECONDS); - if (lockFlag) { + @Scheduled(cron = "35 * * * * ?") + public void messageResend() { + RLock lock = redissonClient.getLock(LOCK_KEY); + log.info("[messageResend][尝试获取锁]"); + // 尝试加锁 + if (lock.tryLock()) { + try { execute(); + } catch (Exception ex) { + log.error("[messageResend][执行异常]", ex); + } finally { + lock.unlock(); } - } catch (InterruptedException e) { - log.error("获取锁失败", e); } - } private void execute() { StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); - for (AbstractStreamMessageListener listener : listeners) { + listeners.forEach(listener -> { PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); // 每个消费者的pending消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); @@ -69,17 +70,18 @@ public class PendingMessageScheduler { // 从消费者的pending队列中读取消息 List> retVal = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); + if (CollUtil.isNotEmpty(retVal)) { + for (MapRecord record : retVal) { + // 重新投递消息 + redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() + .ofObject(record.getValue()) // 设置内容 + .withStreamKey(listener.getStreamKey())); - for (MapRecord record : retVal) { - // 重新投递消息 - redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() - .ofObject(record.getValue()) // 设置内容 - .withStreamKey(listener.getStreamKey())); - - // ack 消息消费完成 - redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); + // ack 消息消费完成 + redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); + } } }); - } + }); } } From e9645d7054db531e02910c7ea89f35b09aea2fe4 Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Fri, 23 Dec 2022 10:31:41 +0800 Subject: [PATCH 4/7] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/scheduler/RedisPendingMessageResendJob.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java index 2f64ab498..59d9a0489 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java @@ -61,17 +61,17 @@ public class RedisPendingMessageResendJob { listeners.forEach(listener -> { PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); - // 每个消费者的pending消息数量 + // 每个消费者的 pending 队列消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); pendingMessagesPerConsumer.entrySet().forEach(entry -> { String consumerName = entry.getKey(); Long pendingMessageCount = entry.getValue(); log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); - // 从消费者的pending队列中读取消息 - List> retVal = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); - if (CollUtil.isNotEmpty(retVal)) { - for (MapRecord record : retVal) { + // 从消费者的 pending 队列中读取消息 + List> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); + if (CollUtil.isNotEmpty(records)) { + for (MapRecord record : records) { // 重新投递消息 redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() .ofObject(record.getValue()) // 设置内容 From 2d7d3f151177f407b9828e295a12c7686afaf34b Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Fri, 23 Dec 2022 10:32:18 +0800 Subject: [PATCH 5/7] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework/mq/scheduler/RedisPendingMessageResendJob.java | 1 - 1 file changed, 1 deletion(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java index 59d9a0489..2a96805db 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java @@ -43,7 +43,6 @@ public class RedisPendingMessageResendJob { @Scheduled(cron = "35 * * * * ?") public void messageResend() { RLock lock = redissonClient.getLock(LOCK_KEY); - log.info("[messageResend][尝试获取锁]"); // 尝试加锁 if (lock.tryLock()) { try { From 0e6b4e463f4e49edea6642c15af7ba87099f2ebc Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Fri, 23 Dec 2022 10:33:38 +0800 Subject: [PATCH 6/7] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yudao/framework/mq/config/YudaoMQAutoConfiguration.java | 2 +- .../mq/{scheduler => job}/RedisPendingMessageResendJob.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/{scheduler => job}/RedisPendingMessageResendJob.java (98%) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index e306389be..042a4f736 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -8,7 +8,7 @@ import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; -import cn.iocoder.yudao.framework.mq.scheduler.RedisPendingMessageResendJob; +import cn.iocoder.yudao.framework.mq.job.RedisPendingMessageResendJob; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RedissonClient; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java similarity index 98% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java index 2a96805db..ff01e3e3a 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.framework.mq.scheduler; +package cn.iocoder.yudao.framework.mq.job; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; From 604266d33c6b29a829282294b02d076e8fbe1b17 Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Tue, 27 Dec 2022 22:52:19 +0800 Subject: [PATCH 7/7] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/job/RedisPendingMessageResendJob.java | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java index ff01e3e3a..f4ba050c0 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.framework.mq.job; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; @@ -22,7 +23,9 @@ import java.util.Map; * 这个任务用于处理,crash 之后的消费者未消费完的消息 */ @Slf4j +@AllArgsConstructor public class RedisPendingMessageResendJob { + private static final String LOCK_KEY = "redis:pending:msg:lock"; private final List> listeners; @@ -30,13 +33,6 @@ public class RedisPendingMessageResendJob { private final String groupName; private final RedissonClient redissonClient; - public RedisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient) { - this.listeners = listeners; - this.redisTemplate = redisTemplate; - this.groupName = groupName; - this.redissonClient = redissonClient; - } - /** * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题 */ @@ -57,28 +53,26 @@ public class RedisPendingMessageResendJob { private void execute() { StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); - listeners.forEach(listener -> { PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); // 每个消费者的 pending 队列消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); - pendingMessagesPerConsumer.entrySet().forEach(entry -> { - String consumerName = entry.getKey(); - Long pendingMessageCount = entry.getValue(); + pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> { log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); // 从消费者的 pending 队列中读取消息 List> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); - if (CollUtil.isNotEmpty(records)) { - for (MapRecord record : records) { - // 重新投递消息 - redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() - .ofObject(record.getValue()) // 设置内容 - .withStreamKey(listener.getStreamKey())); + if (CollUtil.isEmpty(records)) { + return; + } + for (MapRecord record : records) { + // 重新投递消息 + redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() + .ofObject(record.getValue()) // 设置内容 + .withStreamKey(listener.getStreamKey())); - // ack 消息消费完成 - redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); - } + // ack 消息消费完成 + redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); } }); });