本文涉及的 OpenDAL 源码 Commit: 6980cd1
先来聊聊该怎么读 OpenDAL 源码?
坦白说,我也是最近才理清楚 OpenDAL 的源码和其调用关系,之前都是一知半解。
从 Operator
开始
我们所有的 IO 操作都是围绕着 Operator
展开的,先来看下 Operator
是怎么构建的。以 main.rs
为例,首先我们创建了一个基于文件系统的 Backend Builder
;随后将其构建为 accessor
(实现了 Accessor
trait); 我们又将该 accessor
传入了 OperatorBuilder::new
,最后调用了 finish
。
OpenDAL 通过
Accessor
trait 统一了不同存储后端(Backend)的行为,并向上层暴露统一的 IO 接口,例如create_dir
,read
,write
等。
use opendal::services::Fs;
use opendal::Operator;
#[tokio::main]
async fn main() -> Result<()> {
// Create fs backend builder.
let mut builder = Fs::default();
// Set the root for fs, all operations will happen under this root.
//
// NOTE: the root must be absolute path.
builder.root("/tmp");
let accessor = builder.build()?;
let op: Operator = OperatorBuilder::new(accessor)?.finish();
Ok(())
}
在 OperatorBuilder::new
发生了什么
我们传入的 accessor
在调用 new
时,被追加了两层 Layer
,并在调用 finish
时,又被追加了一层内部 Layer
。追加 Layer
后,当我们调用 Operator
暴露出来的接口时,调用会从最外层 CompleteLayer
开始,并最终抵达最内层 FsAccessor
。
FsAccessor
ErrorContextLayer
CompleteLayer
^
|
| Invoking (`read`, `reader_with`, `stat`...)
impl<A: Accessor> OperatorBuilder<A> {
/// Create a new operator builder.
#[allow(clippy::new_ret_no_self)]
pub fn new(accessor: A) -> OperatorBuilder<impl Accessor> {
// Make sure error context layer has been attached.
OperatorBuilder { accessor }
.layer(ErrorContextLayer)
.layer(CompleteLayer)
}
...
/// Finish the building to construct an Operator.
pub fn finish(self) -> Operator {
let ob = self.layer(TypeEraseLayer);
Operator::from_inner(Arc::new(ob.accessor) as FusedAccessor)
}
}
TL;DR 说了半天其实想强调一下,代码应该从
CompleteLayer
开始读(顿悟
背景知识补充
这里我们补充一些必要的上下文信息,以便理解后文内容。
LruCacheLayer
目前,在查询场景,我们追加了一层 LruCacheLayer
,那么我们 Operator
就如下图所示:
S3Accessor FsAccessor
ErrorContextLayer ErrorContextLayer
CompleteLayer CompleteLayer
^ ^ |
| | |
|`inner` `cache`| |
| | |
| | |
| | |
+----- LruCacheLayer -----+ |
^ |
| |
| |
| v
| FileReader::new(oio::TokioReader<tokio::fs::File>)
|
Invoking(`reader`, `reader_with`)
以 read
接口为例,LruCacheLayer
会将 S3 的文件缓存到文件系统中, 并向上层返回缓存的基于文件系统的 Box<dyn oio::Read> (FileReader::new(oio::TokioReader<tokio::fs::File>)
);当然如果读取的文件不存在于缓存时,则先全量从 S3 加载文件至本地的文件系统中。
struct LruCacheLayer {
inner: Operator, // S3Backend
cache: Operator, // FsBackend
index: CacheIndex
}
impl LayeredAccessor for LruCacheLayer {
...
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
if self.index.hit(path, args) {
// Returns `Box<dyn oio::Read>`
self.cache.read(path, args).await
} else {
// Fetches cache and stores...
}
}
...
}
Copy From
的场景
在 Copy From
场景,我并没有加这一层 LruCacheLayer
。那么我们 Operator
就如下图所示:
S3Accessor
ErrorContextLayer
CompleteLayer
▲ │
│ │
│ │
│ ▼
│ RangeReader::new(IncomingAsyncBody)
│
Invoking (`reader`, `reader_with`)
在使用 RangeReader 时遇到的问题
从构建 ParquetRecordBatchStream 说起
在 Copy From
中,我们拿到文件信息后,首先会调用 operator.reader
返回一个实现 AsyncReader
+ AsyncSeek
的 reader
,再套一层 BufReader
;最终将该 reader
传入至 ParquetRecordBatchStreamBuilder
中。
这里面
BufReader
也是多此一举,BufReader
每一次seek
后都会清空内部缓存区,所以其实没有获得任何性能上的收益。
...
let reader = operator
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let buf_reader = BufReader::new(reader.compat());
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
.await
.context(error::ReadParquetSnafu)?;
let upstream = builder
.build()
.context(error::BuildParquetRecordBatchStreamSnafu)?;
...
ParquetRecordBatchStream::new
读取元信息
读取元信息逻辑如下,首先调用 seek(SeekFrom::End(-FOOTER_SIZE_I64))
,读取 FOOTER_SIZE
字节后解析出 metadata_len
;随后再一次调用 seek
,并读取 metadata_len
字节后解析出元信息。
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;
let metadata_len = decode_footer(&buf)?;
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;
let mut buf = Vec::with_capacity(metadata_len);
self.take(metadata_len as _).read_to_end(&mut buf).await?;
Ok(Arc::new(decode_metadata(&buf)?))
}
.boxed()
}
}
真正的问题
到上面为止,都是一些小问题。真正比较棘手的问题发生在这里,这里变量 stream
就是我们上面构建的 ParquetRecordBatchStream
,当我们调用 next 时,ParquetRecordBatchStream
会调用多次 reader
(RangeReader
)的 seek
和 read
。然而每次调用 seek
都会重置 RangeReader 的内部状态(丢弃掉之前的字节流),并在下次调用 read 时,重新发起一个远程请求(后端为 S3 的场景)(相关请参考:issue 和讨论)。
ParquetRecordBatchStream
在取回每列数据时:会先调用 RangeReaderseek
,随后调用read
读取一些字节。那么总共需要发起的远程调用次数为RowGroup 数
乘上RowGroup 内列的数
。我们 800KiB 包含了 50 个RowGroup
和 12 列,也就是发起了 600 次 S3 get 请求!
pub async fn copy_table_from(
...
while let Some(r) = stream.next().await {
let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
let vectors =
Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
let columns_values = fields
.iter()
.cloned()
.zip(vectors)
.collect::<HashMap<_, _>>();
pending.push(self.inserter.handle_table_insert(
InsertRequest {
catalog_name: req.catalog_name.to_string(),
schema_name: req.schema_name.to_string(),
table_name: req.table_name.to_string(),
columns_values,
},
query_ctx.clone(),
));
if pending_mem_size as u64 >= pending_mem_threshold {
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
}
}
...
读一读 RangeReader
的源码
看看 self.poll_read()
RangeReader
其 self.state
初始值为 State::Idle
,首先我们假设 self.offset
为 Some(0)
;随后 self.state
被设置为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)
, 并再次调用 self.poll_read(cx, buf)
。
impl<A, R> oio::Read for RangeReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
...
match &mut self.state {
State::Idle => {
self.state = if self.offset.is_none() {
// Offset is none means we are doing tailing reading.
// we should stat first to get the correct offset.
State::SendStat(self.stat_future())
} else {
State::SendRead(self.read_future())
};
self.poll_read(cx, buf)
}
...
}
}
}
在 self.read_future()
发生了什么
显而易见,self.read_future()
返回了一个 BoxedFuture
;在 BoxedFuture
中调用底层的 Accessor
的 read
接口( acc.read(&path, op).await
)。Accessor
可以是 S3 的存储后端实现,也可以是 OSS 实现等;在我们场景中,这个 Accessor
是 S3 存储后端,那么当它的 read
接口被调用时,会建立取回文件的 TCP 连接,将来自 S3 的响应以字节流的形式返回给上层。
impl<A, R> RangeReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
let acc = self.acc.clone();
let path = self.path.clone();
let mut op = self.op.clone();
// cur != 0 means we have read some data out, we should convert
// the op into deterministic to avoid ETag changes.
if self.cur != 0 {
op = op.into_deterministic();
}
// Alter OpRead with correct calculated range.
op = op.with_range(self.calculate_range());
Box::pin(async move { acc.read(&path, op).await })
}
...
}
书接上文 self.poll_read()
到此为止,poll_read
还没有返回;在上文中 self.poll_read()
被再次调用,此时 self.state
为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)
。这里的 ready!(Pin::new(fut).poll(cx))
返回值就是上文中 acc.read(&path, op).await
调用的返回值。(对于 S3 存储后端,远程调用发生在这里)随后内部状态 self.poll_read
被设置为 State::Read(r)
,并再次调用 self.poll_read()
。再次进入 self.poll_read
时,RangeReader
内部状态被设置为 State::Read(R)
。这里的 R(r)
便是读取请求响应的字节流,对于 S3 存储后端,(Pin::new(r).poll_read(cx, buf)
将 TCP 缓冲区的字节数据写入到上层应用中。
impl<A, R> oio::Read for RangeReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
// Sanity check for normal cases.
if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
return Poll::Ready(Ok(0));
}
match &mut self.state {
...
State::SendRead(fut) => {
let (rp, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
// If read future returns an error, we should reset
// state to Idle so that we can retry it.
self.state = State::Idle;
err
})?;
// Set size if read returns size hint.
if let Some(size) = rp.size() {
if size != 0 && self.size.is_none() {
self.size = Some(size + self.cur);
}
}
self.state = State::Read(r);
self.poll_read(cx, buf)
}
State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
Ok(0) => {
// Reset state to Idle after all data has been consumed.
self.state = State::Idle;
Poll::Ready(Ok(0))
}
Ok(n) => {
self.cur += n as u64;
Poll::Ready(Ok(n))
}
Err(e) => {
self.state = State::Idle;
Poll::Ready(Err(e))
}
},
}
}
}
最后看下 self.poll_seek()
还记得刚才我们 RangeReader
内部状态吗?没错,是 State::Reader(R)
。如果我们在 read
之后在调用 seek
,RangeReader
内部的字节流会被丢弃,状态重新设置为 State::Idle
。也就是说,在每次 seek
调用后再次调用 read
,RangeReader
便会请求底层 Accessor 的 read
接口 (acc.read(&path, op).await)
发起一个远程调用,返回一个包含 [Pos, size)
的 Reader
;对于 S3 存储后端,调用这个接口的开销是非常昂贵的(TTFB 通常高达百毫秒)。
另外还有一个性能相关的重点,当我们尝试 SeekFrom::End()
的时,且 self.size
未知时,会有一次额外的 stat 操作。self.poll_seek()
调用后 self.cur
会被设置为 base.checked_add(amt)
。
总结
我们通过一个快速修复将导入文件的
RowGroup
数量从 50 改为 1,尽管如此,目前还需要 12 次远程调用。后续我们会为上游 OpenDAL 贡献一个BufferReader
(详见 RFC[5]),尽可能地避免连续调用RangeReader
的seek
read
后带来的远程调用(特定场景下可以完全避免)。OpenDAL 调用
seek
后会重置内部状态,下一次调用read
会有一次远程调用请求(后端为 S3 的场景)。(相关请参考 issue[4] 和讨论[5])std::io::BufReader
和tokio::io::BufReader
都会在seek
后清除内部Buffer
,如果希望继续读Buffer
内的内容,应该调用seek_relative
。
关于 Greptime
Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。
欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~
Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb
Twitter: https://twitter.com/Greptime
Slack: https://greptime.com/slack