1 idea新建项目

配置文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.14.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>xyz.foraixh</groupId>
    <artifactId>redis-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>redis-demo</name>
    <description>redis-demo</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.redisson/redisson-spring-boot-starter -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.15.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2 redis相关配置项

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    timeout: 5000
    # 无密码可不填
    # password: password

3 定义延时队列消费接口

public interface RedisDelayedQueueListener<T> extends Consumer<T> {
    /**
     * 测试延时队列名称
     */
    String TEST_LISTENER = "TEST_LISTENER";
}

3 定义延时队列测试消费者实现类

@Component(RedisDelayedQueueListener.TEST_LISTENER)
public class TestListener implements RedisDelayedQueueListener<String> {
    private static final Logger log = LoggerFactory.getLogger(TestListener.class);

    @Override
    public void accept(String s) {
        log.info("listener test: {}", s);
    }
}

4 定义redis延时队列

public class RedisDelayedQueue {
    private static final Logger log = LoggerFactory.getLogger(RedisDelayedQueue.class);

    private final RedissonClient redissonClient;

    public RedisDelayedQueue(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    /**
     * 添加队列
     * @param queueName 延时队列名称
     * @param t        DTO传输类
     * @param delay    时间数量
     * @param timeUnit 时间单位
     * @param <T>      泛型
     */
    public <T> void addItem(T t, long delay, TimeUnit timeUnit, String queueName) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
    }
}

5 定义延时队列启动配置

配置延时队列专用线程池

@Configuration
public class RedisThreadPoolConfiguration {
    @Bean
    public ExecutorService redisDelayedQueueConsumerPool() {
        return new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new DefaultThreadFactory("redis-delayed-queue-consumer-pool"));
    }

    /**
     * 默认的线程池工厂类;可自定义线程名称前缀
     */
    public static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory(String prefix) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = prefix + "-pool-" +
                    POOL_NUMBER.getAndIncrement() +
                    "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

配置延时队列初始化

@Configuration
public class RedisConfiguration implements ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(RedisConfiguration.class);

    private final RedissonClient redissonClient;
    @Qualifier("redisDelayedQueueConsumerPool")
    private final ExecutorService redisDelayedQueueConsumerPool;

    public RedisConfiguration(RedissonClient redissonClient, ExecutorService redisDelayedQueueConsumerPool) {
        this.redissonClient = redissonClient;
        this.redisDelayedQueueConsumerPool = redisDelayedQueueConsumerPool;
    }

    @Bean
    public RedisDelayedQueue redisDelayedQueue() {
        return new RedisDelayedQueue(redissonClient);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        applicationContext.getBeansOfType(RedisDelayedQueueListener.class).forEach(this::listenerStart);
    }

    /**
     * 启动线程获取队列*
     * @param queueName                 queueName
     * @param redisDelayedQueueListener 任务回调监听
     * @param <T>                       泛型
     */
    private <T> void listenerStart(String queueName, RedisDelayedQueueListener<T> redisDelayedQueueListener) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);

        redisDelayedQueueConsumerPool.execute(() -> {
            T t;
            while (true) {
                try {
                    t = blockingFairQueue.take();
                    redisDelayedQueueListener.accept(t);
                } catch (InterruptedException e) {
                    log.error("处理错误", e);
                }
            }
        });
    }
}

6 测试controller

@RestController
public class TestController {
    private static final Logger log = LoggerFactory.getLogger(TestController.class);

    private final RedisDelayedQueue redisDelayedQueue;

    public TestController(RedisDelayedQueue redisDelayedQueue) {
        this.redisDelayedQueue = redisDelayedQueue;
    }

    @GetMapping("/redisDelayQueueTest")
    public String redisDelayQueueTest() {
        redisDelayedQueue.addItem("延时测试", 5, TimeUnit.SECONDS, RedisDelayedQueueListener.TEST_LISTENER);
        log.info("延时测试, 5秒后触发");
        return "延时测试, 5秒后触发";
    }
}

7 测试

项目启动

image-20210420162839457

测试接口调用

image-20210420162909751

成功延时触发

image-20210420162957872

8 完整项目见github

https://github.com/my-vina/redis-demo