物理部署
- Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker分为Master与Slave(主从配置),一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
- Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署
- Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
编译安装
4.2.0版本:
- git clone https://github.com/apache/rocketmq.git
- 编译 mvn -Prelease-all -DskipTests clean install -U
旧版本: - git clone https://github.com/apache/incubator-rocketmq.git
- mvn clean package install -Prelease-all assembly:assembly -U
注意安装要求,maven是3.2.X否则可能会有问题(我的3.5.3运行quickstart的Consumer就有问题)
启动关闭:
cd distribution/target/apache-rocketmq
- nohup sh bin/mqnamesrv &
- nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &,不加autoCreateTopicEnable=true我这边启动Producer报No route info of this topic错
- 使用tail -f ~/logs/rocketmqlogs/XXX.log或jps查看是否启动成功,Ctrl c退出
- 使用sh bin/mqshutdown broker和sh bin/mqshutdown namesrv关闭
jar包依赖
1 | commons-cli-1.2.jar |
三种方式
- 同步——用在大规模分布场景下,像重要的通知消息,短信通知,短信市场系统等
1 | public class SyncProducer { |
- 异步——实时的业务场景
可以看出多了发送完的回调函数
1 | public class AsyncProducer { |
- 单向传输——对可靠性要求一般的场景,像日志搜集
1 | public class OnewayProducer { |
逻辑架构
- 队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
有序消息
生产者
从运行结果看,默认四条消息队列(queueid0-3)。消息的id(0~9)会发送到不同的消息队列上
1 | public static void main( String[] args ) throws Exception |
消费者
消费者消费指定主题的消息,并注册监听器,打印消息并根据收到的消息顺序返回不同的消费状态
- 实现MessageListenerOrderly的consumeMessage方法
- CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
1 | public static void main (String[] args) throws Exception { |
广播模式
- 消费者设置:
//set to broadcast mode consumer.setMessageModel(MessageModel.BROADCASTING);
定时消息
- 生产者
1 | // This message will be delivered to consumer 10 seconds later. |
- 消费者
消费者监听器中可以取得消息的存储时间,进而可以计算出延迟的时间
1 | for (MessageExt message : messages) { |
批量处理
- 能够提高小的消息的传送效率
- 限制1:same topic, same waitStoreMsgOK and no schedule support.
- 限制2:the total size of the messages in one batch should be no more than 1MiB.
1 | String topic = "BatchTest"; |
- 太大则需要分块生产
1 | public class ListSplitter implements Iterator<List<Message>> { |
消息过滤选择
- 一条消息只能有一个tag,因此通过consumer.subscribe(“TOPIC”, “TAGA || TAGB || TAGC”);这种方式的选择性不强
- 可以通过向message中添加1到n条属性,消费者对属性或属性组合进行逻辑判断来过滤消息
1 | // 生产者Set some properties. |