靠得住的「小火」RocketMQ

1,199 阅读23分钟


单机一万以上队列

亿级消息堆积能力

RocketMQ天生为金融互联网领域而生,追求高可靠、高可用、高并发、低延迟

RocketMQ在阿里集团也被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景

功能

发布/订阅消息传递模型

财务级交易消息

各种跨语言客户端,例如Java,C / C ++,Python,Go

可插拔的传输协议,例如TCP,SSL,AIO

内置的消息跟踪功能,还支持开放式跟踪

多功能的大数据和流生态系统集成

按时间或偏移量追溯消息

可靠的FIFO和严格的有序消息传递在同一队列中

高效的推拉消费模型

单个队列中的百万级消息累积容量

多种消息传递协议,例如JMS和OpenMessaging

灵活的分布式横向扩展部署架构

快如闪电的批量消息交换系统

各种消息过滤器机制,例如SQL和Tag

用于隔离测试和云隔离群集的Docker映像

功能丰富的管理仪表板,用于配置,指标和监视

认证与授权

来源: juejin.cn/post/684490…

版本变化

2007-2015年期间:

2015-2020年期间:


架构


工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

背景

Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面

在使用越来越多的队列和virtual Topic的情况下,ActiveMQ IO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。

ActiveMQ vs Kafka vs RocketMQ

消息产品客户端SDK协议和规范有序消息定时消息批处理消息广播消息消息过滤服务器触发的重新交付消息储存消息追溯消息优先级高可用性和故障转移消息追踪配置管理和操作工具
ActiveMQJava,.NET,C ++等推送模型,支持OpenWire,STOMP,AMQP,MQTT,JMS排他消费者或排除队列可以确保顺序××使用JDBC以及高性能日志(例如levelDB,kahaDB)支持非常快速的持久性受支持,取决于存储,如果使用kahadb,则需要ZooKeeper服务器×默认配置为低级别,用户需要优化配置参数
KafkaJava,Scala等拉模型,支持TCP分区内确保消息的顺序×√ 带有异步生成器×√ 可以使用Kafka Streams过滤消息×高性能文件存储支持的偏移量指示×受支持,需要ZooKeeper服务器×Kafka使用键值对格式进行配置。这些值可以从文件或以编程方式提供。√ 使用终端命令公开核心指标
RocketMQJava,C ++,Go拉模型,支持TCP,JMS,OpenMessaging确保消息的严格排序,并可以正常扩展√ 具有同步模式,可避免消息丢失支持的基于SQL92的属性过滤器表达式高性能和低延迟文件存储支持的时间戳和偏移量两个表示×受支持的主从模式,无需其他套件开箱即用,用户只需要注意一些配置√ 富Web和终端命令可显示核心指标

环境要求

64位操作系统

64位 jdk1.8+

maven3.2.x

4g以上空闲硬盘

快速入门

linux

下载编译并解压:

wget https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
unzip rocketmq-all-4.7.0-source-release.zip
cd rocketmq-all-4.7.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0

启动name server:

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

启动broker:

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log 

发送和接收消息:

告诉客户端name server位置,设置环境变量:export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

关闭服务器

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

windows

下载:archive.apache.org/dist/rocket…

解压到:D:\rocketmq

设置环境变量:

ROCKETMQ_HOME="D:\rocketmq"
NAMESRV_ADDR="localhost:9876"

启动name server:

.\bin\mqnamesrv.cmd

启动broker:

.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

发送和接收消息:

.\bin\tool.cmd  org.apache.rocketmq.example.quickstart.Producer
.\bin\tool.cmd  org.apache.rocketmq.example.quickstart.Consumer

简单示例

本示例包含三种方式发送消息:可靠同步发送,可靠的异步发送,单向传输

使用RocketMQ接收消息

maven依赖:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.0</version>
    </dependency>

同步发送:

在重要的通知消息,SMS通知,SMS营销系统等广泛的场景中使用可靠的同步传输。

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

异步发送:

异步传输通常用于对时间敏感的业务场景中。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        
        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

单向传输模式

单向传输用于要求中等可靠性的情况,也就是非高可靠,例如日志收集。

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);        
        producer.shutdown();
    }
}

另外,您可以从以下网址获取更多示例:github.com/apache/rock…

核心概念


根据上述模型,我们可以更深入地探讨有关消息系统设计的一些主题:

  • 消费者并发

  • 消费者热点

  • 消费者负载平衡

  • 消息路由

  • 连接复用

  • 金丝雀部署

消息模型

  • Clustering

  • Broadcasting

消息顺序

Broker

consumer

producer

name server

Tag

消息队列Queue

消息

主题Topic

consumer group

producer group

pull consumer

push consumer

概念详解见最下方

有序消息

RocketMQ使用FIFO顺序提供有序消息。

下面的示例演示了全局内和分区内有序消息的发送/接收。

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}
public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

广播消息

【广播】是向订阅了某个主题的所有订阅者都发送一条消息。如果希望所有订阅者都收到有关主题的消息,那么广播是一个不错的选择。

public class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}
public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

定时消息

定时消息与普通消息的不同之处在于,它们要等到指定的时间之后才发送。

启动消费者等待消息到来:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.common.message.MessageExt;
 import java.util.List;
    
 public class ScheduledMessageConsumer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate message consumer
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
         // Subscribe topics
         consumer.subscribe("TestTopic", "*");
         // Register message listener
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                 for (MessageExt message : messages) {
                     // Print approximate delay time period
                     System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                             + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         // Launch consumer
         consumer.start();
     }
 }

发送定时消息:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
    
 public class ScheduledMessageProducer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate a producer to send scheduled messages
         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
             // This message will be delivered to consumer 10 seconds later.
             message.setDelayTimeLevel(3);
             // Send the message
             producer.send(message);
         }
    
         // Shutdown producer after use.
         producer.shutdown();
     }
        
 }

为什么要批处理?

批量发送消息可以提高传递小消息的性能。

使用限制

同一批次的消息应具有:相同的主题,相同的waitStoreMsgOK,并且不支持定时发。

此外,一批消息的总大小不得超过1MiB。

如何使用批处理

如果您一次只发送不超过1MiB的消息,则可以轻松使用批处理:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}

拆分成消息列表list

当您发送大批量时,复杂性会增加,并且您可能不确定它是否超过大小限制(1MiB)。

这时,您最好拆分成消息列表list:

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //handle the error
   }
}

过滤消息

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);
   
producer.shutdown();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

tag是一种简单好用的查询消息的方式

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

使用者将收到包含TAGA或TAGB或TAGC的消息。但是限制是,一条消息只能有一个标签,这对于复杂的情况可能不起作用。在这种情况下,您可以使用SQL表达式来过滤出消息。

原理

SQL功能可以通过发送消息时放入的属性进行一些计算。在RocketMQ定义的语法下,您可以实现一些有趣的逻辑。这是一个例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

文法

RocketMQ仅定义了一些基本语法来支持此功能。您也可以轻松扩展它。

  1. 数值比较,如>>=<<=BETWEEN=;

  2. 字符比较,如=<>IN;

  3. IS NULLIS NOT NULL;

  4. 逻辑ANDORNOT

常量类型为:

  1. 数字,例如123、3.1415;

  2. 字符,例如“ abc”,必须用单引号引起来;

  3. NULL,特殊常数;

  4. 布尔,TRUEFALSE;

使用限制

只有推送使用者才能通过SQL92选择消息。该接口是:

public void subscribe(final String topic, final MessageSelector messageSelector)

追加日志

RocketMQ logappender提供log4j追加器,log4j2追加器和logback追加器以供使用,以下是配置示例。

log4j

使用log4j属性配置文件时,请进行如下配置。

log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n

使用log4j xml配置文件时,请按此配置并添加一个异步附加程序:

<appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
    <param name="Tag" value="yourTag" />
    <param name="Topic" value="yourLogTopic" />
    <param name="ProducerGroup" value="yourLogGroup" />
    <param name="NameServerAddress" value="yourRocketmqNameserverAddress"/>
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
    </layout>
</appender>

<appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender">
    <param name="BufferSize" value="1024" />
    <param name="Blocking" value="false" />
    <appender-ref ref="mqAppender1"/>
</appender>

log4j2

使用log4j2时,请按以下步骤进行配置。如果不希望阻塞,只需为引用配置asyncAppender即可。

<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
     topic="yourLogTopic" tag="yourTag">
    <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>

Logappender Example

ON THIS PAGELOG4JLOG4J2LOGBACK

RocketMQ logappender provides log4j appender, log4j2 appender and logback appender for bussiness to use, below are config examples.

log4j

When using log4j properties config file,config as below.

log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n

When using log4j xml config file,config it as this and also add a async appender:

<appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
    <param name="Tag" value="yourTag" />
    <param name="Topic" value="yourLogTopic" />
    <param name="ProducerGroup" value="yourLogGroup" />
    <param name="NameServerAddress" value="yourRocketmqNameserverAddress"/>
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
    </layout>
</appender>

<appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender">
    <param name="BufferSize" value="1024" />
    <param name="Blocking" value="false" />
    <appender-ref ref="mqAppender1"/>
</appender>

log4j2

When using log4j2,config as this.If you want noneblock,just config an asyncAppender for ref.

<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
     topic="yourLogTopic" tag="yourTag">
    <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>

logback

使用logback时,还需要asyncAppender。

<appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
    <tag>yourTag</tag>
    <topic>yourLogTopic</topic>
    <producerGroup>yourLogGroup</producerGroup>
    <nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress>
    <layout>
        <pattern>%date %p %t - %m%n</pattern>
    </layout>
</appender>

<appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
    <queueSize>1024</queueSize>
    <discardingThreshold>80</discardingThreshold>
    <maxFlushTime>2000</maxFlushTime>
    <neverBlock>true</neverBlock>
    <appender-ref ref="mqAppender1"/>
</appender>

OMS开发消息示例

OMS:OpenMessaging,包括建立行业指南和消息传递,流规范,为金融,电子商务,物联网和大数据领域提供通用框架。在分布式异构环境中,设计原则是面向云,简单,灵活且独立于语言。符合这些规范将使跨所有主要平台和操作系统开发异构消息应用程序成为可能。

OMS生产者

以下示例显示了如何以同步,异步或单向传输方式将消息发送到RocketMQ代理。

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }

        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }

        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

OMSPull消费者

使用OMS PullConsumer轮询来自指定队列的消息。

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        
        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }

        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

OMSPushConsumer

将OMS PushConsumer附加到指定的队列,并通过MessageListener使用消息

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PushConsumer consumer = messagingAccessPoint.
            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
        
        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
            @Override
            public void onMessage(final Message message, final ReceivedMessageContext context) {
                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
                context.ack();
            }
        });
        
    }
}

事务消息

什么是事务消息?

可以将其视为两阶段提交消息实现,以确保分布式系统中的最终一致性。事务性消息可确保本地事务的执行和消息的发送可以原子方式执行。

使用限制

(1)事务消息不支持定时和批处理。 (2)为了避免对单个消息进行过多的检查并导致近半队列的消息积累,我们默认将单个消息的检查数量限制为15次,但是用户可以通过更改broker配置中的参数“ transactionCheckMax”来更改此限制。,如果一条消息经过“ transactionCheckMax”次检查,则broker将丢弃此消息并默认同时打印错误日志。用户可以通过覆盖“ AbstractTransactionCheckListener”类来更改此行为。 (3)在broker配置中由参数“ transactionTimeout”确定,检查事务消息的过期时间。用户还可以在发送事务性消息时通过设置用户属性“ CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制,该参数优先于“ transactionMsgTimeout”参数。 (4)事务消息可能被检查或消耗了不止一次。 (5)提交给用户目标主题Topic的已提交消息可能会失败。当前,它取决于日志记录。RocketMQ本身的高可用性机制可确保高可用性。如果要确保事务消息不会丢失并且事务完整性得到保证,建议使用同步双写。机制。 (6)事务性消息的生产者ID不能与其他类型的消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ Server通过生产者ID查询客户机。

应用

1,交易状态

事务消息有三种状态: (1)TransactionStatus.CommitTransaction:提交事务,表示允许使用者使用此消息。 (2)TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除且不允许使用。 (3)TransactionStatus.Unknown:中间状态,表示需要MQ进行回溯以确定状态。

2,发送事务信息

(1)创建事务生产方 使用TransactionMQProducer类创建生产方客户端,并指定唯一的producerGroup,然后可以设置自定义线程池来处理检查请求。执行本地事务后,需要根据执行结果对MQ进行回复,回复状态如上节所述。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

(2)实现TransactionListener接口 当发送half消息成功时,使用“ executeLocalTransaction”方法执行本地事务。它返回上一部分中提到的三个事务状态之一。 “ checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一部分提到的三个事务状态之一。

import ...

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

概念详解

  • Producer Group 用来表示一个収送消息应用,一个 Producer Group 下包含多个 Producer 实例,可以是多台机器,也可以 是一台机器的多个进程,或者一个进程的多个 Producer 对象。一个 Producer Group 可以収送多个 Topic 消息,Producer Group 作用如下:

  1. 标识一类 Producer

  2. 可以通过运维工具查询这个发送消息应用下有多个 Producer 实例

  3. 収送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意 一台机器来确认事务状态。

  • Consumer Group 用来表示一个消费消息应用,一个 Consumer Group 下包含多个 Consumer 实例,可以是多台机器,也可 以是多个进程,或者是一个进程的多个 Consumer 对象。一个 Consumer Group 下的多个 Consumer 以均摊 方式消费消息,如果设置为广播方式,那么这个 Consumer Group 下的每个实例都消费全量数据。

  • Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Consumer 消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • Push Consumer Consumer 的一种,应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立 刻回调 Listener 接口方法。

  • Pull Consumer Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

  • Producer Group 一类 Producer 的集合名称,返类 Producer 通常収送一类消息,且发送逻辑一致。

  • Consumer Group 一类 Consumer 的集合名称,返类 Consumer 通常消费一类消息,且发送逻辑一致。

  • Broker 消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。

  • 广播消费 一条消息被多个 Consumer 消费,即使返些 Consumer 属亍同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以讣为在消息划分方面无意义。 在 CORBA Notification 规范中,消费方式都属于广播消费。 在 JMS 规范中,相当亍 JMS publish/subscribe model

  • 集群消费 一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 Consumer Group 有 3 个实例(可能是 3 个迕程,戒者 3 台机器),那举每个实例只消费其中的 3 条消息。 在 CORBA Notification 规范中,无此消费方式。 在 JMS 规范中,JMS point-to-point model 与之类似,但是 RocketMQ 的集群消费功能大等亍 PTP 模型。 因为RocketMQ单个Consumer Group内的消费者类似于PTP,但是一个Topic/Queue可以被多个Consumer Group 消费。

  • 顺序消息 消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺 序性,必须 Producer 单线程顺序发送,且发送到同一个队列,返样 Consumer 就可以按照 Producer 发送 的顺序去消费消息。

  • 普通顺序消息 顺序消息的一种,正常情冴下可以保证完全的顺序消息,但是一旦収生通信异常,Broker 重启,由于队列 总数发生变化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况(如某个 Broker 宕机戒者重启)下,消息短暂的乱序,使用普通顺序方 式比较合适。

  • 严格顺序消息 顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只 要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。 如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自劢切换,自劢切换功能目前还未实现) 目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推 荐使用普通的顺序消息。

  • Message Queue

在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储 单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100 年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来 删除。 也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。

原理

零拷贝原理 Consumer 消费消息过程,使用了零拷贝,零拷贝包含以下两种方式

  1. 使用 mmap + write 方式 优点:即使频繁调用,使用小块文件传输,效率也很高 缺点:不能很好的利用 DMA 方式,会比 sendfile 多消耗 CPU,内存安全性控制复杂,需要避免 JVM Crash 问题。

  2. 使用 sendfile 方式 优点:可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题。 缺点:小块文件效率低亍 mmap 方式,只能是 BIO 方式传输,不能使用 NIO。 RocketMQ 选择了第一种方式,mmap+write 方式,因为有小块数据传输的需求,效果会比 sendfile 更好。 关亍 Zero Copy 的更详细介绍,请参考以下文章 www.linuxjournal.com/article/634…

RocketMQ 选择 Linux Ext4 文件系统,原因如下: Ext4 文件系统删除 1G 大小的文件通常耗时小亍 50ms,而 Ext3 文件系统耗时约 1s 左右,且删除文件时,磁盘 IO 压力极大,会导致 IO 写入超时。 文件系统局面需要做以下调优措施 文件系统 IO 调度算法需要调整为 deadline,因为 deadline 算法在随机读情冴下,可以合幵读请求为顺序跳跃 方式,从而提高读 IO 吞吐量。

数据存储结构


关键特性

单机支持 1 万以上持久化队列

刷盘策略:异步刷盘和同步刷盘。RocketMQ 的所有消息都是持丽化的,兇写入系统 PAGECACHE,然后刷盘,可以保证内存不磁盘都有一份数据,访问时,直接从内存读取。

长轮询 Pull:RocketMQ 的 Consumer 都是从 Broker 拉消息来消费,但是为了能做到实时收消息,RocketMQ 使用长轮询方 式,可以保证消息实时性同 Push 方式一致

发送消息负载均衡:収送消息通过轮询队列的方式发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量

订阅消息负载均衡:Consumer 数量要小亍等亍队列数量,如果 Consumer 超过队列数量,那举多余的 Consumer 将不能消费消息

单队列并行消费:单队列幵行消费采用滑劢窗口方式并行消费

消息消费失败,定时重试

HA,同步双写/异步复制:异步复制的实现思路非常简单,Slave 启动一个线程,不断从 Master 拉取 Commit Log 中的数据,然后在异步构建出 Consume Queue 数据结构。整个实现过程基本同 Mysql 主从同步类似

socket--》java 堆 --》PageCache 物理内存 --》磁盘。

96G内存,1k1条消息,可缓存一亿条消息

基于netty通信

同一客户端连接,可多线程发送。-- 连接复用

参考:

rocketmq前世今生:blog.csdn.net/weixin_3388…

官网地址:rocketmq.apache.org/docs/quick-…

rocketmq开发者指南:github.com/apache/rock…

rocketmq开发手册3.2.4.pdf