1 idea新建项目
配置文件如下:
- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
- 09
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.14.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>xyz.foraixh</groupId>
<artifactId>redis-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redis-demo</name>
<description>redis-demo</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.redisson/redisson-spring-boot-starter -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2 redis相关配置项
- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
spring:
redis:
host: 127.0.0.1
port: 6379
timeout: 5000
# 无密码可不填
# password: password
3 定义延时队列消费接口
- 01
- 02
- 03
- 04
- 05
- 06
public interface RedisDelayedQueueListener<T> extends Consumer<T> {
/**
* 测试延时队列名称
*/
String TEST_LISTENER = "TEST_LISTENER";
}
3 定义延时队列测试消费者实现类
- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
- 09
public class TestListener implements RedisDelayedQueueListener<String> {
private static final Logger log = LoggerFactory.getLogger(TestListener.class);
public void accept(String s) {
log.info("listener test: {}", s);
}
}
4 定义redis延时队列
- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
- 09
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
public class RedisDelayedQueue {
private static final Logger log = LoggerFactory.getLogger(RedisDelayedQueue.class);
private final RedissonClient redissonClient;
public RedisDelayedQueue(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* 添加队列
* @param queueName 延时队列名称
* @param t DTO传输类
* @param delay 时间数量
* @param timeUnit 时间单位
* @param <T> 泛型
*/
public <T> void addItem(T t, long delay, TimeUnit timeUnit, String queueName) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}
}
5 定义延时队列启动配置
配置延时队列专用线程池
- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
- 09
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
public class RedisThreadPoolConfiguration {
public ExecutorService redisDelayedQueueConsumerPool() {
return new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("redis-delayed-queue-consumer-pool"));
}
/**
* 默认的线程池工厂类;可自定义线程名称前缀
*/
public static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory(String prefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = prefix + "-pool-" +
POOL_NUMBER.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
配置延时队列初始化
- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
- 09
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
public class RedisConfiguration implements ApplicationContextAware {
private static final Logger log = LoggerFactory.getLogger(RedisConfiguration.class);
private final RedissonClient redissonClient;
private final ExecutorService redisDelayedQueueConsumerPool;
public RedisConfiguration(RedissonClient redissonClient, ExecutorService redisDelayedQueueConsumerPool) {
this.redissonClient = redissonClient;
this.redisDelayedQueueConsumerPool = redisDelayedQueueConsumerPool;
}
public RedisDelayedQueue redisDelayedQueue() {
return new RedisDelayedQueue(redissonClient);
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
applicationContext.getBeansOfType(RedisDelayedQueueListener.class).forEach(this::listenerStart);
}
/**
* 启动线程获取队列*
* @param queueName queueName
* @param redisDelayedQueueListener 任务回调监听
* @param <T> 泛型
*/
private <T> void listenerStart(String queueName, RedisDelayedQueueListener<T> redisDelayedQueueListener) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
redisDelayedQueueConsumerPool.execute(() -> {
T t;
while (true) {
try {
t = blockingFairQueue.take();
redisDelayedQueueListener.accept(t);
} catch (InterruptedException e) {
log.error("处理错误", e);
}
}
});
}
}
6 测试controller
- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
- 09
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
public class TestController {
private static final Logger log = LoggerFactory.getLogger(TestController.class);
private final RedisDelayedQueue redisDelayedQueue;
public TestController(RedisDelayedQueue redisDelayedQueue) {
this.redisDelayedQueue = redisDelayedQueue;
}
public String redisDelayQueueTest() {
redisDelayedQueue.addItem("延时测试", 5, TimeUnit.SECONDS, RedisDelayedQueueListener.TEST_LISTENER);
log.info("延时测试, 5秒后触发");
return "延时测试, 5秒后触发";
}
}
7 测试
项目启动

image-20210420162839457
测试接口调用

image-20210420162909751
成功延时触发

image-20210420162957872