From 0b5662b6989b5ae053f441adc80b48ad14fe02f9 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 3 May 2021 01:49:56 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E7=BB=93=E6=9E=84=E8=B0=83?= =?UTF-8?q?=E6=95=B4=20x=2020=20:=20=E6=8B=86=E5=88=86=E5=87=BA=E7=8B=AC?= =?UTF-8?q?=E7=AB=8B=E7=9A=84=20yudao-spring-boot-starter-mq=20=E7=BB=84?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E6=95=B4=E7=90=86=E6=9B=B4=E5=8A=A0=E6=B8=85?= =?UTF-8?q?=E6=99=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-admin-server/pom.xml | 10 ++ .../config/InfConfigRefreshConsumer.java | 2 +- .../config/InfConfigRefreshMessage.java | 2 +- .../mq/producer/config/InfConfigProducer.java | 2 +- .../consumer/dept/SysDeptRefreshConsumer.java | 2 +- .../dict/SysDictDataRefreshConsumer.java | 2 +- .../mq/consumer/mail/SysMailSendConsumer.java | 2 +- .../permission/SysMenuRefreshConsumer.java | 2 +- .../SysRoleMenuRefreshConsumer.java | 2 +- .../permission/SysRoleRefreshConsumer.java | 2 +- .../sms/SysSmsChannelRefreshConsumer.java | 2 +- .../mq/consumer/sms/SysSmsSendConsumer.java | 2 +- .../sms/SysSmsTemplateRefreshConsumer.java | 2 +- .../message/dept/SysDeptRefreshMessage.java | 2 +- .../dict/SysDictDataRefreshMessage.java | 2 +- .../mq/message/mail/SysMailSendMessage.java | 2 +- .../permission/SysMenuRefreshMessage.java | 2 +- .../permission/SysRoleMenuRefreshMessage.java | 2 +- .../permission/SysRoleRefreshMessage.java | 2 +- .../sms/SysSmsChannelRefreshMessage.java | 2 +- .../mq/message/sms/SysSmsSendMessage.java | 2 +- .../sms/SysSmsTemplateRefreshMessage.java | 2 +- .../mq/producer/dept/SysDeptProducer.java | 2 +- .../mq/producer/dict/SysDictDataProducer.java | 2 +- .../producer/permission/SysMenuProducer.java | 2 +- .../permission/SysPermissionProducer.java | 2 +- .../producer/permission/SysRoleProducer.java | 2 +- .../mq/producer/sms/SysSmsProducer.java | 2 +- .../redis/core/stream/RedisStreamTest.java | 2 +- yudao-dependencies/pom.xml | 7 ++ yudao-framework/pom.xml | 4 + .../yudao-spring-boot-starter-mq/pom.xml | 26 +++++ .../mq/config/YudaoMQAutoConfiguration.java | 105 ++++++++++++++++++ .../AbstractChannelMessageListener.java | 2 +- .../mq}/core/pubsub/ChannelMessage.java | 2 +- .../stream/AbstractStreamMessageListener.java | 2 +- .../mq}/core/stream/StreamMessage.java | 2 +- .../mq}/core/util/RedisMessageUtils.java | 6 +- .../yudao/framework/mq/package-info.java | 6 + .../main/resources/META-INF/spring.factories | 2 + .../config/YudaoRedisAutoConfiguration.java | 84 -------------- 41 files changed, 195 insertions(+), 119 deletions(-) create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/pom.xml create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java rename yudao-framework/{yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis => yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq}/core/pubsub/AbstractChannelMessageListener.java (97%) rename yudao-framework/{yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis => yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq}/core/pubsub/ChannelMessage.java (84%) rename yudao-framework/{yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis => yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq}/core/stream/AbstractStreamMessageListener.java (97%) rename yudao-framework/{yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis => yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq}/core/stream/StreamMessage.java (84%) rename yudao-framework/{yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis => yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq}/core/util/RedisMessageUtils.java (87%) create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring.factories diff --git a/yudao-admin-server/pom.xml b/yudao-admin-server/pom.xml index 15fc83132..107b67e6b 100644 --- a/yudao-admin-server/pom.xml +++ b/yudao-admin-server/pom.xml @@ -65,6 +65,12 @@ yudao-spring-boot-starter-job + + + cn.iocoder.boot + yudao-spring-boot-starter-mq + + cn.iocoder.boot @@ -104,6 +110,10 @@ cn.smallbun.screw screw-core + + cn.iocoder.boot + yudao-spring-boot-starter-mq + diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/consumer/config/InfConfigRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/consumer/config/InfConfigRefreshConsumer.java index 4ab2ba4c2..35d3f791b 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/consumer/config/InfConfigRefreshConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/consumer/config/InfConfigRefreshConsumer.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.infra.mq.consumer.config; import cn.iocoder.yudao.framework.apollo.internals.DBConfigRepository; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.adminserver.modules.infra.mq.message.config.InfConfigRefreshMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java index a4f6e0116..433048143 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.infra.mq.message.config; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; import lombok.Data; /** diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java index 44d52e646..39a8e76b3 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.infra.mq.producer.config; -import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.infra.mq.message.config.InfConfigRefreshMessage; +import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dept/SysDeptRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dept/SysDeptRefreshConsumer.java index 6673826fd..81bed5083 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dept/SysDeptRefreshConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dept/SysDeptRefreshConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.dept; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.dept.SysDeptRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.service.dept.SysDeptService; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dict/SysDictDataRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dict/SysDictDataRefreshConsumer.java index d7ff74d25..08f4b813e 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dict/SysDictDataRefreshConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/dict/SysDictDataRefreshConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.dict; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.dict.SysDictDataRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.service.dict.SysDictDataService; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/mail/SysMailSendConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/mail/SysMailSendConsumer.java index 38629873c..ff3b017a9 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/mail/SysMailSendConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/mail/SysMailSendConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.mail; -import cn.iocoder.yudao.framework.redis.core.stream.AbstractStreamMessageListener; +import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.mail.SysMailSendMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysMenuRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysMenuRefreshConsumer.java index 3d2541676..9048e3ccb 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysMenuRefreshConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysMenuRefreshConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.permission; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysMenuRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.service.permission.SysMenuService; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleMenuRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleMenuRefreshConsumer.java index fc8270638..5cdaeef00 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleMenuRefreshConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleMenuRefreshConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.permission; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleMenuRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.service.permission.SysPermissionService; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleRefreshConsumer.java index 8bb2deec3..a2f4cc528 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleRefreshConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/permission/SysRoleRefreshConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.permission; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.service.permission.SysRoleService; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsChannelRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsChannelRefreshConsumer.java index 21c187414..540ff17e7 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsChannelRefreshConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsChannelRefreshConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.sms; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsChannelRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.service.sms.SysSmsChannelService; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsSendConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsSendConsumer.java index 68fa3159f..947b23940 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsSendConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsSendConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.sms; -import cn.iocoder.yudao.framework.redis.core.stream.AbstractStreamMessageListener; +import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsSendMessage; import cn.iocoder.yudao.adminserver.modules.system.service.sms.SysSmsService; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsTemplateRefreshConsumer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsTemplateRefreshConsumer.java index b083243bb..1f4d92ffc 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsTemplateRefreshConsumer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/consumer/sms/SysSmsTemplateRefreshConsumer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.consumer.sms; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsTemplateRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.service.sms.SysSmsTemplateService; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java index 2aa5332a6..724547d68 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.dept; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; import lombok.Data; /** diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java index b4520fbe7..7b735deb9 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.dict; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; import lombok.Data; /** diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java index cfea76b27..bb9f62170 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.mail; -import cn.iocoder.yudao.framework.redis.core.stream.StreamMessage; +import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage; import lombok.Data; import javax.validation.constraints.NotNull; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java index 73d611f4d..1fa2a3879 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; import lombok.Data; /** diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java index 03e321c6c..8b9f50c91 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; import lombok.Data; /** diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java index 9ae1d94ce..8d8d1e01a 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; import lombok.Data; /** diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java index 7379188f8..a37295615 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.sms; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; import lombok.Data; /** diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsSendMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsSendMessage.java index f073640da..34ebf9101 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsSendMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsSendMessage.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.sms; import cn.iocoder.yudao.framework.common.core.KeyValue; -import cn.iocoder.yudao.framework.redis.core.stream.StreamMessage; +import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage; import lombok.Data; import javax.validation.constraints.NotNull; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java index 9472f8bf7..c8bb00af2 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.sms; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; import lombok.Data; /** diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java index b4c974a26..948796e22 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.dept; -import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils; +import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.dept.SysDeptRefreshMessage; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java index 5eb29ebff..ea0183722 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.dict; -import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils; +import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.dict.SysDictDataRefreshMessage; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java index 4e1e6a659..6b3493469 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission; -import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils; +import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysMenuRefreshMessage; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java index 310fefd87..d9a1bfcc2 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission; -import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils; +import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleMenuRefreshMessage; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java index 09579e7e3..6888f27bc 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission; -import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils; +import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleRefreshMessage; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java index 53aa97321..81236287c 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.sms; import cn.iocoder.yudao.framework.common.core.KeyValue; -import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils; +import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsChannelRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsSendMessage; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsTemplateRefreshMessage; diff --git a/yudao-admin-server/src/test-integration/java/cn/iocoder/yudao/adminserver/framework/redis/core/stream/RedisStreamTest.java b/yudao-admin-server/src/test-integration/java/cn/iocoder/yudao/adminserver/framework/redis/core/stream/RedisStreamTest.java index eea5c00a5..727b25750 100644 --- a/yudao-admin-server/src/test-integration/java/cn/iocoder/yudao/adminserver/framework/redis/core/stream/RedisStreamTest.java +++ b/yudao-admin-server/src/test-integration/java/cn/iocoder/yudao/adminserver/framework/redis/core/stream/RedisStreamTest.java @@ -6,7 +6,7 @@ import cn.iocoder.yudao.adminserver.modules.system.mq.consumer.mail.SysMailSendC import cn.iocoder.yudao.adminserver.modules.system.mq.consumer.sms.SysSmsSendConsumer; import cn.iocoder.yudao.adminserver.modules.system.mq.message.mail.SysMailSendMessage; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsSendMessage; -import cn.iocoder.yudao.framework.redis.core.util.RedisMessageUtils; +import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.context.annotation.Import; diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 7522fff9a..7fab31091 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -193,6 +193,13 @@ ${revision} + + + cn.iocoder.boot + yudao-spring-boot-starter-mq + ${revision} + + cn.iocoder.boot diff --git a/yudao-framework/pom.xml b/yudao-framework/pom.xml index 9363279f6..635dc7cf6 100644 --- a/yudao-framework/pom.xml +++ b/yudao-framework/pom.xml @@ -15,12 +15,16 @@ yudao-spring-boot-starter-redis yudao-spring-boot-starter-web yudao-spring-boot-starter-security + yudao-spring-boot-starter-monitor yudao-spring-boot-starter-protection yudao-spring-boot-starter-config yudao-spring-boot-starter-job + yudao-spring-boot-starter-mq + yudao-spring-boot-starter-excel yudao-spring-boot-starter-test + yudao-spring-boot-starter-biz-operatelog yudao-spring-boot-starter-biz-dict yudao-spring-boot-starter-biz-sms diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml new file mode 100644 index 000000000..caef6f420 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml @@ -0,0 +1,26 @@ + + + + cn.iocoder.boot + yudao-framework + ${revision} + + 4.0.0 + yudao-spring-boot-starter-mq + jar + + ${artifactId} + 消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费 + https://github.com/YunaiV/ruoyi-vue-pro + + + + + cn.iocoder.boot + yudao-spring-boot-starter-redis + + + + diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java new file mode 100644 index 000000000..e105dd8f3 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -0,0 +1,105 @@ +package cn.iocoder.yudao.framework.mq.config; + +import cn.hutool.system.SystemUtil; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; +import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; + +import java.util.List; + +/** + * 消息队列配置类 + * + * @author 芋道源码 + */ +@Configuration +@AutoConfigureAfter(YudaoMQAutoConfiguration.class) +@Slf4j +public class YudaoMQAutoConfiguration { + + /** + * 创建 Redis Pub/Sub 广播消费的容器 + */ + @Bean + public RedisMessageListenerContainer redisMessageListenerContainer( + RedisConnectionFactory factory, List> listeners) { + // 创建 RedisMessageListenerContainer 对象 + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + // 设置 RedisConnection 工厂。 + container.setConnectionFactory(factory); + // 添加监听器 + listeners.forEach(listener -> { + container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); + log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", + listener.getChannel(), listener.getClass().getName()); + }); + return container; + } + + /** + * 创建 Redis Stream 集群消费的容器 + * + * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html + */ + @Bean(initMethod = "start", destroyMethod = "stop") + public StreamMessageListenerContainer> redisStreamMessageListenerContainer( + RedisTemplate redisTemplate, List> listeners) { + // 第一步,创建 StreamMessageListenerContainer 容器 + // 创建 options 配置 + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = + StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() + .batchSize(10) // 一次性最多拉取多少条消息 + .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 + .build(); + // 创建 container 对象 + StreamMessageListenerContainer> container = StreamMessageListenerContainer.create( + redisTemplate.getRequiredConnectionFactory(), containerOptions); + + // 第二步,注册监听器,消费对应的 Stream 主题 + String consumerName = buildConsumerName(); +// String consumerName = "110"; + listeners.forEach(listener -> { + // 创建 listener 对应的消费者分组 + try { + redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); + } catch (Exception ignore) {} + // 设置 listener 对应的 redisTemplate + listener.setRedisTemplate(redisTemplate); + // 创建 Consumer 对象 + Consumer consumer = Consumer.from(listener.getGroup(), consumerName); + // 设置 Consumer 消费进度,以最小消费进度为准 + StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); + // 设置 Consumer 监听 + StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest + .builder(streamOffset).consumer(consumer) + .autoAcknowledge(false) // 不自动 ack + .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false + container.register(builder.build(), listener); + }); + return container; + } + + /** + * 构建消费者名字,使用本地 IP + 进程编号的方式。 + * 参考自 RocketMQ clientId 的实现 + * + * @return 消费者名字 + */ + private static String buildConsumerName() { + return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java similarity index 97% rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/AbstractChannelMessageListener.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java index 8d3bf6fc1..9905a08ed 100644 --- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/AbstractChannelMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.framework.redis.core.pubsub; +package cn.iocoder.yudao.framework.mq.core.pubsub; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/ChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java similarity index 84% rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/ChannelMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java index 60e5f494f..ff55f8b01 100644 --- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/pubsub/ChannelMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.framework.redis.core.pubsub; +package cn.iocoder.yudao.framework.mq.core.pubsub; import com.fasterxml.jackson.annotation.JsonIgnore; diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/AbstractStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java similarity index 97% rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/AbstractStreamMessageListener.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java index 85d44f4b1..612b5a029 100644 --- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/AbstractStreamMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.framework.redis.core.stream; +package cn.iocoder.yudao.framework.mq.core.stream; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/StreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java similarity index 84% rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/StreamMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java index ecb9c7409..30b38c62d 100644 --- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/stream/StreamMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.framework.redis.core.stream; +package cn.iocoder.yudao.framework.mq.core.stream; import com.fasterxml.jackson.annotation.JsonIgnore; diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/util/RedisMessageUtils.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java similarity index 87% rename from yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/util/RedisMessageUtils.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java index 0d8358699..57c925fa7 100644 --- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/core/util/RedisMessageUtils.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java @@ -1,7 +1,7 @@ -package cn.iocoder.yudao.framework.redis.core.util; +package cn.iocoder.yudao.framework.mq.core.util; -import cn.iocoder.yudao.framework.redis.core.pubsub.ChannelMessage; -import cn.iocoder.yudao.framework.redis.core.stream.StreamMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java new file mode 100644 index 000000000..48eaf2386 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java @@ -0,0 +1,6 @@ +/** + * 消息队列,基于 Redis 提供: + * 1. 基于 Pub/Sub 实现广播消费 + * 2. 基于 Stream 实现集群消费 + */ +package cn.iocoder.yudao.framework.mq; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring.factories b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..d4ca5b91d --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration diff --git a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/config/YudaoRedisAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/config/YudaoRedisAutoConfiguration.java index 4e9f0a002..8f4b5ad6c 100644 --- a/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/config/YudaoRedisAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-redis/src/main/java/cn/iocoder/yudao/framework/redis/config/YudaoRedisAutoConfiguration.java @@ -1,23 +1,11 @@ package cn.iocoder.yudao.framework.redis.config; -import cn.hutool.system.SystemUtil; -import cn.iocoder.yudao.framework.redis.core.pubsub.AbstractChannelMessageListener; -import cn.iocoder.yudao.framework.redis.core.stream.AbstractStreamMessageListener; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.stream.Consumer; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.listener.ChannelTopic; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.RedisSerializer; -import org.springframework.data.redis.stream.StreamMessageListenerContainer; - -import java.util.List; /** * Redis 配置类 @@ -44,76 +32,4 @@ public class YudaoRedisAutoConfiguration { return template; } - /** - * 创建 Redis Pub/Sub 广播消费的容器 - */ - @Bean - public RedisMessageListenerContainer redisMessageListenerContainer( - RedisConnectionFactory factory, List> listeners) { - // 创建 RedisMessageListenerContainer 对象 - RedisMessageListenerContainer container = new RedisMessageListenerContainer(); - // 设置 RedisConnection 工厂。 - container.setConnectionFactory(factory); - // 添加监听器 - listeners.forEach(listener -> { - container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); - log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", - listener.getChannel(), listener.getClass().getName()); - }); - return container; - } - - /** - * 创建 Redis Stream 集群消费的容器 - * - * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html - */ - @Bean(initMethod = "start", destroyMethod = "stop") - public StreamMessageListenerContainer> redisStreamMessageListenerContainer( - RedisTemplate redisTemplate, List> listeners) { - // 第一步,创建 StreamMessageListenerContainer 容器 - // 创建 options 配置 - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = - StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() - .batchSize(10) // 一次性最多拉取多少条消息 - .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 - .build(); - // 创建 container 对象 - StreamMessageListenerContainer> container = StreamMessageListenerContainer.create( - redisTemplate.getRequiredConnectionFactory(), containerOptions); - - // 第二步,注册监听器,消费对应的 Stream 主题 - String consumerName = buildConsumerName(); -// String consumerName = "110"; - listeners.forEach(listener -> { - // 创建 listener 对应的消费者分组 - try { - redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); - } catch (Exception ignore) {} - // 设置 listener 对应的 redisTemplate - listener.setRedisTemplate(redisTemplate); - // 创建 Consumer 对象 - Consumer consumer = Consumer.from(listener.getGroup(), consumerName); - // 设置 Consumer 消费进度,以最小消费进度为准 - StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); - // 设置 Consumer 监听 - StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest - .builder(streamOffset).consumer(consumer) - .autoAcknowledge(false) // 不自动 ack - .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false - container.register(builder.build(), listener); - }); - return container; - } - - /** - * 构建消费者名字,使用本地 IP + 进程编号的方式。 - * 参考自 RocketMQ clientId 的实现 - * - * @return 消费者名字 - */ - private static String buildConsumerName() { - return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); - } - }