菜鸟带你基于redis实现消息队列

1,018 阅读10分钟

背景介绍

消息队列在互联网领域得到了广泛应用,主要用于异步处理、模块间解耦和高并发系统的流量削峰等场景中,开源中比较优秀的消息队列有ActiveMq、RocketMq 和Kafka等。本文章主要是基于redis实现消息队列,如有错误,敬请大牛指导,我只是一个在前进中的rookie。基于redis实现消息队列,首先需要了解下面几个redis的命令,命令来源于redis官方

redis命令

PUSH

  • RPUSH 关键字 元素[元素...]
自1.0.0起可用。
时间复杂度:每个添加的元素为O(1),因此当使用多个参数调用命令时,O(N)要添加N个元素
Insert all the specified values at the tail of the list stored at key. If key does not exist, it is created as empty list before performing the push operation. When key holds a value that is not a list, an error is returned.
It is possible to push multiple elements using a single command call just specifying multiple arguments at the end of the command. Elements are inserted one after the other to the tail of the list, from the leftmost element to the rightmost element. So for instance the command RPUSH mylist a b c will result into a list containing a as first element, b as second element and c as third element.

return value

操作后链表整个长度

History

>= 2.4: Accepts multiple element arguments. In Redis versions older than 2.4 it was possible to push a single value per command.

example

redis> RPUSH mylist "hello"
(integer) 1
redis> RPUSH mylist "world"
(integer) 2
redis> LRANGE mylist 0 -1
1) "hello"
2) "world"
redis> 
  • LPUSH
自1.0.0起可用。
时间复杂度:每个添加的元素为O(1),因此当使用多个参数调用命令时,O(N)要添加N个元素
Insert all the specified values at the head of the list stored at key. If key does not exist, it is created as empty list before performing the push operations. When key holds a value that is not a list, an error is returned.
It is possible to push multiple elements using a single command call just specifying multiple arguments at the end of the command. Elements are inserted one after the other to the head of the list, from the leftmost element to the rightmost element. So for instance the command LPUSH mylist a b c will result into a list containing c as first element, b as second element and a as third element.

意思是从链表的左边插入,比如LPUSH a b c ,则结果应该是这样 c b a

examples

redis> LPUSH mylist "world"
(integer) 1
redis> LPUSH mylist "hello"
(integer) 2
redis> LRANGE mylist 0 -1
1) "hello"
2) "world"
redis> 
  • LPOP
Removes and returns the first element of the list stored at key.

return value

Bulk string reply: the value of the first element, or nil when key does not exist.

意思当key存在时,删除并返回第一个值,如果key不存在时,返回nil,也就是暗含非阻塞

examples

redis> RPUSH mylist "one"
(integer) 1
redis> RPUSH mylist "two"
(integer) 2
redis> RPUSH mylist "three"
(integer) 3
redis> LPOP mylist
"one"
redis> LRANGE mylist 0 -1
1) "two"
2) "three"
redis> lpop mynolist
(nil)
  • RPOP

    与LPOP相反。

  • BLPOP 键[键...] 超时

BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the head of the first list that is non-empty, with the given keys being checked in the order that they are given.

阻塞获取value,可以多个键 BLPOP key1 key2 key3 10 只要有一个其中一个有值,就不会阻塞,当 key1和key2都存在值,则优先返回key1值

return value

1、nil当无法弹出任何元素且超时已过期
2、一个由两个元素组成的多体,第一个元素是弹出元素的键的名称,第二个元素是弹出元素的值。

examples

redis> DEL list1 list2
(integer) 0
redis> RPUSH list1 a b c
(integer) 3
redis> BLPOP list1 list2 0
1) "list1"
2) "a"
  • RLPOP 键[键...] 超时

与BLPOP 键[键...] 超时 原理基本相同。

官方文档中在讲这个命令时,提到的可靠消息队列。所以猜测基于redis实现消息队列应该用到了这个命令

Reliable queues

When BLPOP returns an element to the client, it also removes the element from the list. This means that the element only exists in the context of the client: if the client crashes while processing the returned element, it is lost forever.

This can be a problem with some application where we want a more reliable messaging system. When this is the case, please check the BRPOPLPUSH command, that is a variant of BLPOP that adds the returned element to a target list before returning it to the client.

大概意思是消费者消费消息后,redis会清除掉此消息,那么这条信息只会保留在应用端上下文中,如果消费者在消费过程中出现异常,导致消费没有被正常的消费,那么这条消息就被彻底的丢失,对于一些需要更高的可高性的系统来说确实 是一个问题,在这种情况下可以查看BRPOPLPUSH 命令,这个命令是BLPOP的一个变体,在返回消费端信息前会将信息存在链表中。

  • RPOPLPUSH source destination

BRPOPLPUSH是RPOPLPUSH的一个变体,原理差不多,这里简单介绍下RPOPLPUSH

Atomically returns and removes the last element (tail) of the list stored at source, and pushes the element at the first element (head) of the list stored at destination.

For example: consider source holding the list a,b,c, and destination holding the list x,y,z. Executing RPOPLPUSH results in source holding a,b and destination holding c,x,y,z.

If source does not exist, the value nil is returned and no operation is performed. If source and destination are the same, the operation is equivalent to removing the last element from the list and pushing it as first element of the list, so it can be considered as a list rotation command.

描述已经很清楚了,这是一个原子操作从souece中获取元素时会保存到destination中

examples

redis> RPUSH mylist "one"
(integer) 1
redis> RPUSH mylist "two"
(integer) 2
redis> RPUSH mylist "three"
(integer) 3
redis> RPOPLPUSH mylist myotherlist
"three"
redis> LRANGE mylist 0 -1
1) "one"
2) "two"
redis> LRANGE myotherlist 0 -1
1) "three"
redis> 

关于redis的命令暂时就介绍这几个,下面会用到这几个。了解reids更多命令redis.io.command

设计目标

简单易用

提供简单明了的API,提供相应的消费端接口,消费者只需要首先RedisMessageHandler接口或者相关子类,在实现中完成消息的消费,同时提供消息到领域模型的转换方法,使使用者将更多时间投入到业务开发中。

高性能

通过线程池技术来保证高性能,同时提供同步处理机制 (适合轻量级服务)、异步线程模型(适合IO密集型/执行比较时间长的服务)

设计难点

线程模型

提供同步线程模型和异步线程模型两种模型,使用者根据场景选择

  • 同步线程模型

每个消费者使用一个消费线程,这个消费线程包括两个处理逻辑,一是从redis消费消息,二是处理业务逻辑,即消费线程与业务逻辑处理是用一个线程,这种模型下不适合执行时间过长的业务。如果执行时间过长,可能会导致redis堆积大量消息。

  • 异步线程模型

这种模型与同步线程模型相反,消费线程与处理线程相互分离,一个线程从redis消费,但是不处理业务逻辑,业务逻辑在worker线程中处理,与netty中的boss线程和worker线程相似。这种模型适合IO密集型任务,不会使redis堆积大量消息,因为消费线程不会阻塞。

核心实现

生产者设计与实现

生产者对发送发送逻辑进行封装,用户通过指定消息队列key,将某个领域对象或者消息发送到redis,领域模型通过json自动转换为消息发送。

    public <T> void setBeanToRedis(String key, T bean) {
        JedisUtis.setLPush(key, JSON.toJSONString(bean));
    }

消费者设计与实现

消费者是对从redis获取消息的包装,用户通过指定redis的key,不断的从该key获取消息,交给对应的handler去消费消息;消费者的实现相比生产者要复杂一些,消费者提供了同步和异步消费消息,包括消息消费处理异常等

  • 通过构造方法指定线程池大小以及是否异步
    public RedisQueueConsumer(int minTheadNum, int maxTheadNum, MessageHandler handler, String key) {
        this.handler = handler;
        this.key = key;
        workerThreadPool = new ThreadPoolExecutor(minTheadNum, maxTheadNum, 6000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
        bossTheadPool = Executors.newFixedThreadPool(5);
    }

    public RedisQueueConsumer(int minTheadNum, int maxTheadNum, MessageHandler handler, String key, boolean async) {
        this(minTheadNum, maxTheadNum, handler, key);
        this.async = async;
    }
  • 消费接口
public interface MessageHandler {
    void execute(String message);
}

提供顶层抽象方法,尽可能的将公共逻辑封装在抽象逻辑中,使用户只实现消息消费逻辑即可,分别提供对领域模型和string抽象

/**
 * @author Qi.qingshan
 * @date 2020/5/5
 */
public abstract class AbstractMessageHandler implements MessageHandler {

    private static final Logger log = LoggerFactory.getLogger(AbstractMessageHandler.class);

    //消息处理
    public void execute(String message) {
        try {
            doExecute(message);
        } catch (Exception e) {
            // todo handle exception
            e.printStackTrace();
        }
    }

    protected abstract void doExecute(String message);
}

对消息到领域模型自动转换

/**
 * @author Qi.qingshan
 * @date 2020/5/5
 */
public abstract class AbstractBeanMessageHandler<T> extends AbstractMessageHandler{

    private static final Logger log = LoggerFactory.getLogger(AbstractBeanMessageHandler.class);

    private Class<T> clazz;

    public AbstractBeanMessageHandler(Class<T> clazz){
        this.clazz = clazz;
    }

    @Override
    protected void doExecute(String message) {
        doExecute(JSON.parseObject(message,clazz));
    }

    protected abstract void doExecute(T bean);

}
  • 消息消费

抽象中实现从redis获取消息

    abstract class AbstractMessageTask implements Runnable {

        private String queueKey;

        public AbstractMessageTask(String queueKey, MessageHandler handler) {
            this.queueKey = queueKey;
        }

        public void run() {
            while (status == Status.RUNNING) {
                String message = JedisUtis.getLPop(queueKey);
                handleMessage(message);
            }
        }

        protected abstract void handleMessage(String message);
    }

实现异步消费逻辑,这块需要优化,不符合职责单一设计原则,将异步与同步进行分离,在不同的task实现

    public class AsyncMessageTask extends AbstractMessageTask {

        private MessageHandler handler;

        public AsyncMessageTask(String queueKey, MessageHandler handler) {
            super(queueKey, handler);
            this.handler = handler;
        }

        @Override
        protected void handleMessage(final String message) {
            if (async) {
                workerThreadPool.submit(new Runnable() {
                    @Override
                    public void run() {
                        handler.execute(message);
                    }
                });
            } else {
                handler.execute(message);
            }
        }
    }
  • 消息推送

redis中是通过LPUSH实现消息推送与LPOP获取消息,上面介redis绍命令时,还特意提了下,后边需要优化

    public static long setLPush(String key, String... message) {
        Jedis jedis = getJedis();
        try {
            Long lpush = jedis.lpush(key, message);
            return lpush;
        } finally {
            close(jedis);
        }
    }
    <span class="hljs-function" style="line-height: 26px;"><span class="hljs-keyword" style="color: #a626a4; line-height: 26px;">public</span> <span class="hljs-keyword" style="color: #a626a4; line-height: 26px;">static</span> String <span class="hljs-title" style="color: #4078f2; line-height: 26px;">getLPop</span><span class="hljs-params" style="line-height: 26px;">(String key)</span> </span>{
    Jedis jedis = getJedis();
    <span class="hljs-keyword" style="color: #a626a4; line-height: 26px;">try</span> {
        String value = jedis.lpop(key);
        <span class="hljs-keyword" style="color: #a626a4; line-height: 26px;">return</span> value;
    } <span class="hljs-keyword" style="color: #a626a4; line-height: 26px;">finally</span> {
        close(jedis);
    }
}

代码不是很严谨,基本功能已经实现,可以测试下

场景测试

生产者

/**
 * @author Qi.qingshan
 * @date 2020/5/5
 */
public class RedisQueueProducerTest {

    private static final String KEY = "key";

    public static void main(String[] args) {

        RedisQueueProducer producer = new RedisQueueProducer();
        for(int i = 0; i < 10 ; i++){
            producer.send(KEY, "message :: " + i);
        }
    }
}

消费者

实现对应的消息处理器

/**
 * @author Qi.qingshan
 * @date 2020/5/5
 */
public class StringMessageHandler extends AbstractMessageHandler {
    @Override
    protected void doExecute(String message) {
        //执行业务逻辑
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println(Thread.currentThread().getName() + " - "+ "accept message -> " + message);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
/**
 * @author Qi.qingshan
 * @date 2020/5/5
 */
public class RedisQueueConsumerTest {

    private static final String KEY = "key";

    public static void main(String[] args) {
        StringMessageHandler handler = new StringMessageHandler();
        RedisQueueConsumer consumer = new RedisQueueConsumer(10,40,handler ,KEY,true);
        consumer.startup();
    }
}

使用异步线程模型消费消息

启动消费者,首先通过线程名可以确实是异步线程池,获取的消费为null是因为生产者没启动,可见LPOP并非是阻塞,没有消息,直接返回null

启动消费者,可以发现正常获取消息了

如果不考虑其它因素,功能是实现了,但是空消费导致服务器资源浪费,浪费CPU,并且从在可靠性问题,redis官方建议使用RPOPLPUSH, 上边已经讲过这个命令,简单调整获取消息方法

public static String getLPop(String key) {
        Jedis jedis = getJedis();
        try {
            String value = jedis.brpoplpush(key,"key_back" ,1000);
            return value;
        } finally {
            close(jedis);
        }
    }

启动消费者, 没有收到消息

启动生产者,发现消费者消费到了消息

看下redis上对应的key_back还有没有消息

发现消息确实备份了,这样再业务处理异常后不会导致消息彻底的丢失。

结束语

功能基本实现,主要介绍了原理,还有好多细节没处理,比如优雅停机,异常处理等,后续会不断优化,同时适配spring boot等,花了整整一天时间,原创不易,欢迎指导,胳膊疼.......休息哈^_^