Aliyun Log Java Producerbb
📚研究大数据、高并发场景下的高性能类库--Aliyun Log Java Producer
本文研究一款被称为大数据、高并发场景下的高性能类库- Aliyun Log Java Producer。这个框架是一个 Java 客户端,通过攒批往日志服务(SLS)写入日志数据。
阅读源码,学习架构设计,从而应用到实际工作中。
📑一、Aliyun Log Java Producer 介绍
来自官网的介绍
在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。
- 异步非阻塞
- 线程安全
- 优雅关闭
- ......
这些特性确实值得研究学习。
📒二、类库中的设计
我们先理解一些设计,然后再深入代码细节。
2.1 攒批发送
如果每次写一条日志数据,都发起一次 http 的调用,在海量日志下,性能肯定撑不住。 通过攒批,达到一定数量后进行发送,虽然会有一定时间的延迟,但是整体性能得到提升。
2.2 异步执行
异步提升吞吐率。
2.3 水位线
攒批是在内存中缓存了日志数据,会占用内存,避免资源被耗尽,设置一个最大水位线。
2.4 分而治之
发送成功和失败的结果将放入不同的队列。针对不同的任务用不同的线程、队列处理是非常合理的一种手段。
进入源码之前再做一些知识准备。
📖三、知识准备
3.1 SettableFuture
guava 对 JDK#Future 的扩展工具类,可以手动设置返回结果到 Future。
这个类库充分使用了这个能力。
过程如下:
日志被提交了,立即返回一个 ListenableFuture 。不用同步等待结果,增加了日志发送的吞吐率。
直接使用 Callback 则更加简单。
📜四、源码理解
先对几个关键类做一个简单的介绍:
类 | 作用 |
LogAccumulator | 攒批容器;控制缓存数据水位线 |
RetryQueue | 失败待重试的 ProducerBatch |
BatchHandler | 处理发送成功或失败的 batch;进行回调 |
Mover | 循环地将 LogAccumulator 和 RetryQueue 中的超时 batch 处理 |
producerBatch | 批对象 |
4.1 攒批发送
相关代码
com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator#doAppend
appendToHolder 会进行攒批,满足条件进行发送。部分核心代码如下:
4.2 异步非阻塞
异步非阻塞的关键点:
- 使用 SettableFuture
- 将 callback、future 包装成 Thunk
- 攒批发送后,再处理 Thunk
发送逻辑讲解完成,下面分析异步结果处理。
4.3 异步线程结果处理
发送结果会根据是否成功等将其添加到不同的队列。
代码逻辑如下:
注:失败的 producerBatch 只会重试一次
发送接口返回的 ListenableFuture 是该批次的 result
4.4 过期批处理
触发发送批的逻辑,没有加入时间考虑。如果没有达到发送条件,日志数据会一直保留。因此由独立线程去处理 (Mover 线程)
Mover 线程会不断地扫描过期时间的任务进行执行
4.5 优雅关闭
框架针对优雅关闭做的努力, 考虑了很多细节。保证 close 方法退时,producer 缓存的所有数据都能被处理。
按照数据流动方向依次关闭队列和线程来达到优雅关闭、安全退出
所有线程 close() 方法大致逻辑:
- 修改 closed=true,表示不再接受新任务
- join(timeoutMs) 等待该线程任务执行结束
线程池的关闭逻辑
具体可以阅读 close 方法。
com.aliyun.openservices.aliyun.log.producer.Producer#close()
4.6 补充部分知识
通过 Semaphore 控制缓存待发送数据的内存大小
private final Semaphore memoryController
通过 ProducerBatchHolder 缓存攒批日志数据 ProducerBatchHolder 中有一个 ConcurrentHashMap#putIfAbsent 的用法 如果不存在,那么会向 map 中添加该键值对,并返回 null。
如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。
private final ConcurrentMap<GroupKey, ProducerBatchHolder> batches
通过上面代码实现了线程安全。
4.7 避免空转的锁竞争
Mover 线程如果空转会增加 ProducerBatchHolder 的锁竞争。考虑增加一定的休眠时间,下面这段代码就是这么做的,确实考虑很周全!!!
4.8 不足之处
- 异步发送的日志结果是批次执行的结果;可以增加一个 requestId 的透传。处理有点粗糙
到此分析结束。
✒️五、最后
这个工程还是非常不错的,各方面考虑得也很周全,值得推荐学习。
🤔将学习的技术应用到自己的工程中,从模仿到超越!
SLS 和 该类库的一些资料参考
- 使用参考案例代码:Aliyun LOG Java Producer 快速入门
- github地址: Aliyun LOG Java Producer
Loading...