1 idea新建项目

配置文件如下:

xml
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.14.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>xyz.foraixh</groupId> <artifactId>redis-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redis-demo</name> <description>redis-demo</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.redisson/redisson-spring-boot-starter --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.15.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

2 redis相关配置项

yml
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
spring: redis: host: 127.0.0.1 port: 6379 timeout: 5000 # 无密码可不填 # password: password

3 定义延时队列消费接口

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
public interface RedisDelayedQueueListener<T> extends Consumer<T> { /** * 测试延时队列名称 */ String TEST_LISTENER = "TEST_LISTENER"; }

3 定义延时队列测试消费者实现类

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
@Component(RedisDelayedQueueListener.TEST_LISTENER) public class TestListener implements RedisDelayedQueueListener<String> { private static final Logger log = LoggerFactory.getLogger(TestListener.class); @Override public void accept(String s) { log.info("listener test: {}", s); } }

4 定义redis延时队列

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
public class RedisDelayedQueue { private static final Logger log = LoggerFactory.getLogger(RedisDelayedQueue.class); private final RedissonClient redissonClient; public RedisDelayedQueue(RedissonClient redissonClient) { this.redissonClient = redissonClient; } /** * 添加队列 * @param queueName 延时队列名称 * @param t DTO传输类 * @param delay 时间数量 * @param timeUnit 时间单位 * @param <T> 泛型 */ public <T> void addItem(T t, long delay, TimeUnit timeUnit, String queueName) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); } }

5 定义延时队列启动配置

配置延时队列专用线程池

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
@Configuration public class RedisThreadPoolConfiguration { @Bean public ExecutorService redisDelayedQueueConsumerPool() { return new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new DefaultThreadFactory("redis-delayed-queue-consumer-pool")); } /** * 默认的线程池工厂类;可自定义线程名称前缀 */ public static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory(String prefix) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = prefix + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } } }

配置延时队列初始化

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
@Configuration public class RedisConfiguration implements ApplicationContextAware { private static final Logger log = LoggerFactory.getLogger(RedisConfiguration.class); private final RedissonClient redissonClient; @Qualifier("redisDelayedQueueConsumerPool") private final ExecutorService redisDelayedQueueConsumerPool; public RedisConfiguration(RedissonClient redissonClient, ExecutorService redisDelayedQueueConsumerPool) { this.redissonClient = redissonClient; this.redisDelayedQueueConsumerPool = redisDelayedQueueConsumerPool; } @Bean public RedisDelayedQueue redisDelayedQueue() { return new RedisDelayedQueue(redissonClient); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { applicationContext.getBeansOfType(RedisDelayedQueueListener.class).forEach(this::listenerStart); } /** * 启动线程获取队列* * @param queueName queueName * @param redisDelayedQueueListener 任务回调监听 * @param <T> 泛型 */ private <T> void listenerStart(String queueName, RedisDelayedQueueListener<T> redisDelayedQueueListener) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); redisDelayedQueueConsumerPool.execute(() -> { T t; while (true) { try { t = blockingFairQueue.take(); redisDelayedQueueListener.accept(t); } catch (InterruptedException e) { log.error("处理错误", e); } } }); } }

6 测试controller

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
@RestController public class TestController { private static final Logger log = LoggerFactory.getLogger(TestController.class); private final RedisDelayedQueue redisDelayedQueue; public TestController(RedisDelayedQueue redisDelayedQueue) { this.redisDelayedQueue = redisDelayedQueue; } @GetMapping("/redisDelayQueueTest") public String redisDelayQueueTest() { redisDelayedQueue.addItem("延时测试", 5, TimeUnit.SECONDS, RedisDelayedQueueListener.TEST_LISTENER); log.info("延时测试, 5秒后触发"); return "延时测试, 5秒后触发"; } }

7 测试

项目启动

测试接口调用

成功延时触发

8 完整项目见github

https://github.com/my-vina/redis-demo