丢消息
生产者本地消息表
在发送方,我采用的是本地消息表解决方案。简单来说,就是在业务操作的过程中,在本地消息表里面记录一条待发消息,做成一个本地数据库事务。然后尝试立刻发送消息,如果发送成功,那么就把本地消息表里对应的数据删除,或者把状态标记成已发送。
如果这个时候失败了,就可以立刻尝试重试。同时,还要有一个异步的补发机制,扫描本地消息表,找出已经过了一段时间,比如说三分钟,但是还没有发送成功的待发消息,然后补发。
最后提到的异步补发机制,你可以简单理解成有一个线程定时扫描数据库,找到需要发送但是又没有发送的消息发送出去。更直观的来说,就是这个线程会执行一个类似这样的 SQL
额外增加了一个新的列,用来控制重试的间隔和重试的次数。如果最终补发都失败了就会告警。这个时候就需要人手工介入了。
那么这时候本地消息表至少有两个关键列:一个是消息体列,里面存储了消息的数据;另一个是重试机制列,里面可以只存储重试次数,也可以存储重试间隔、已重试次数、最大重试次数。剩余的列,你就根据自己的需要随便加,不关键
案其实就是把一个分布式事务转变成本地事务 + 补偿机制。
分布式事务一般不用,通过更多优雅的手段来处理这些问题,这些都是常见问题,分布式环境丢数据都很正常
通过一些手段来多次校验保证即可
消息队列自身不丢
那么 acks 就需要设置成 all。而且,也不能允许 Kafka 使用unclean 选举。
进一步考虑刷盘的问题,那么就需要调整 log.flush.interval.messages、log.flush.interval.ms 和 log.flush.scheduler.interval.ms 的值。
在关键业务上,我一般都是把 acks 设置成 all 并且禁用 unclean 选举,来确保消息发送到了消息队列上,而且不会丢失。同时 log.flush.interval.messages、log.flush.interval.ms和 log.flush.scheduler.interval.ms 三个参数会影响刷盘,这三个值我们公司设置的是10000、2000、3000。理论上来说,这种配置确实还是有一点消息丢失的可能,但是概率已经非常低了。只有一种可能,就是消息队列完成主从同步之后,主分区和 ISR 的从分区都没来得及刷盘就崩溃了,才会丢失消息。这个时候真丢失了消息,就只能人手工补发
消费者
消费者一般要保证幂等操作
异步处理要注意
一定要把消费逻辑设计成幂等的。你的微服务也要尽可能设计成幂等的,这样上游就可以利用重试来提高可用性了。另外我要说明一点,现在大多数消息中间件都声称自己实现了恰好一次(exactly once)语义,都是依赖于重试和幂等来达成的。
消息回查
MQ 提供消息回查的时候我还吐槽过这就是中间件研发者闲着没事给自己刷 KPI。 ——非常规业务手段。可能在阿里系是常规手段
消息回查机制是指消息队列允许你在发送消息的时候,先发一个准备请求,里面带着你的消息。这个时候消息队列并不会把消息转交给消费者,而是当业务完成之后,你需要再发一个确认请求,这时候消息中间件才会把消息转交给消费者。
消息回查机制依赖于消息队列的支持。RocketMQ 是支持的,但是不幸的是 Kafka 和RabbitMQ 都不支持。
它的基本步骤是这样的:
整个过程到处都是亮点,我们一个一个看。
亮点一:回查实现你应该注意到了,如果在业务操作完成之后,没有发提交消息,这时候就需要回查中间件来回查。一般来说,回查中间件会异步地扫描长时间未提交的消息,然后回查业务方。
1.应用代码把准备消息发送到 topic=look_back_msg 上。里面包含业务 topic、消息体、业务类型、业务 ID、准备状态、回查接口。
2. 回查中间件消费这个 look_back_msg,把消息内容存储到数据库里。
3.应用代码执行完业务操作之后,再发送一个消息到 look_back_msg 上,带上业务类型、业务 ID 和提交状态这些信息。如果应用代码执行业务出错了,那么就使用回滚状态。
4.回查中间件查询消息内容,转发到业务 topic 上
总结
对于存储时间长的数据,回查中间件反查业务,确定删除还是提交
Loading...