消息队列已经逐渐成为企业应用系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,当前用的比较多的消息中间件有 RabbitMQ、RocketMQ、ActiveMQ、Kafka等。
使用场景
- 应用解耦:从系统之间的主动调用变成消息的订阅发布,从而解耦。
- 异步处理:系统间串行逐个同步调用非常耗时且不稳定(当任一系统调用报错或超时都将导致整个逻辑失败),使用消息队列做到异步、并行处理。当然,前提是返回的结果不依赖于处理的结果。
- 流量削峰:大量请求(例如秒杀)直接打到数据库将会导致数据库挂掉,通过将请求先转发到消息队列中,然后系统按照数据库所能处理的并发量从消息队列中逐步拉取消息进行消费。
- 消息通讯:消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。有基于消息队列的RPC框架,例如rabbitmq-jsonrpc(基于rabbitmq低延迟特性)。
- 日志处理:比较熟悉的 ELK + Kafka 日志方案。业务系统向Kafka中推送日志消息。Logstash从Kafka中消费日志信息,并存入Elasticsearch中。Elasticsearch负责日志的搜索和统计工作。Kibana是基于Elasticsearch的数据可视化组件。
其中,应用解耦和异步处理比较核心。
常见问题
- 系统可用性降低:在原来的系统架构中加入了MQ,万一MQ挂了,整套系统就挂了。所以,消息队列一定要做好高可用。
- 系统复杂度提高:需要考虑和关注的问题变多了,消息如何保证不丢失(消息确认和持久化)?消息如何保证不被重复消费(消息添加唯一标识,添加排重记录。业务层做幂等判断)?需要消息顺序的业务场景如何处理等。
- 一致性问题:在使用MQ时,一定要保证数据的最终一致性。
消息的投递方式
push
优点:保证及时性
缺点:受限于消费者的消费能力,可能造成消息的堆积。
pull
优点:主动权掌握在消费方,可根据自己的消费速度进行消息的拉取。
缺点:消费方不知道什么时候获取新的消息,会有消息延迟和忙等。
消息的持久化方式
- 分布式KV存储
- 关系型数据库DB
- 文件系统
从存储效率上来说,文件系统>分布式KV存储>关系型数据库DB。另外,从消息中间件本身定义考虑,应该尽量减少对于外部第三方中间件的依赖。所以个人觉得采用消息刷盘至中间件所部署的虚拟机/物理机的的文件系统来做持久化比较合适。
Kafka 相关知识
由于公司 MQ 中间件选型使用的是 Kafka,所以记录一下 Kafka 相关的知识。
常见名词及概念
- Borker:节点,Kafka 集群中,通过 Zookeeper 选举某个 Broker 作为 Controller ,用来进行 leader election 以及 各种 failover,保证高可用。
- Producer:消息提供者,根据指定算法,将消息发送到指定的分区中去。
- Consumer:消息消费者,负责消费消息。
- Consumer Group:每个 Consumer 都属于一个 Consumer group,每条消息只能被 Consumer group 中的一个 Consumer 消费,但可以被多个 Consumer group 消费
- Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
- Partition:Topic 物理上的分组(分区),一个 Topic 可以分为多个 Partition 。每个 Partition 都是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)每个replica分布在不同的broker节点上。多个partition需要选举出leader partition,leader partition负责读写,并由zookeeper负责 fail over。
- Segement:消息片段,一个Partition可以包含多个Segement。
- ISR:In Sync Replicas(正在同步的集合),只有在 ISR 中的副本才能被拿来替代主副本。ISR = Leader + 没有落后太多的副本。
- AR:副本全集,等于 ISR + OSR。
常用配置
- acks:提供消息发送是否发送成功的可靠性保证,是写入leader分区完成就响应给producter成功,还是写入所有分区成功再响应。
- replication.factor:给topic设置的参数,表示分区的副本数,一般这个值都要大于1。
- retries:写入重试次数,一般设置为 MAX,表示无限重试。
- batch.size:采用分批发送消息的策略,减少请求数。此参数设置缓冲空间的大小。
- linger.ms:配置生产者在发送请求之前等待时间,希望更多的消息填补到未满的批中。在低负载的情况下,设置此值大于0,以少量的延迟代价换取更少、更有效的请求。
- buffer.memory:控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
- min.insync.replicas;在kafka服务端设置的参数,表示ISR分区集合的最小值,一般都要大于1,这样保证leader挂了还有一个follower。
会不会丢数据
Broker
比较常见的一种,leader分区的消息还没同步到replica分区,所在的Broker宕机了就会导致还未同步的消息丢失。可以通过给Topic设置replication.factor参数大于1,保证每个分区至少有两个副本。在kafka服务端设置min.insync.replicas参数大于1,保证leader分区感知到至少有一个follower还跟自己保持联系,没掉队。
提供者端
在produer端设置acks=all,要求消息写入所有replica之后,才能认为是写入成功。同时设置retries=MAX,保持无限重试。这两个设置可以保证消息在提供者端一定不会丢失。
消费者端
消息还没处理完成,kafka自动提交offset会导致消费者端丢消息。那么只要关闭自动提交offset,在处理完成之后自己手动提交offset,就可以保证数据不会丢。但是有可能导致重复消费的问题,比如消息刚处理完,还没提交offset,结果自己挂了,就会导致此条消息还会被消费一次,就得业务端做幂等判断了。