欢迎参与 8 月 1 日中午 11 点的线上分享,了解 GreptimeDB 联合处理指标和日志的最新方案! 👉🏻 点击加入

Skip to content
On this page
技术
2023-12-26

S3 调用次数减少 98% | 探索 OpenDAL RangeReader 的奥秘

GreptimeDB 把 OpenDAL 作为统一的数据访问层。前段时间同事告诉我:数据库执行 `Copy From` 语句从 S3 导入一个 800 KiB 的 Parquet 文件需要 10s;经过一些调查,又研读了相关 `Reader` 的文档和具体实现后,目前我们用一个快速修复把导入时间优化到了 1s 内,后续会为上游实现 `BufferReader` 并做进一步的优化。本文做一个记录和简单的总结。

本文涉及的 OpenDAL 源码 Commit: 6980cd1

先来聊聊该怎么读 OpenDAL 源码?

坦白说,我也是最近才理清楚 OpenDAL 的源码和其调用关系,之前都是一知半解。

Operator 开始

我们所有的 IO 操作都是围绕着 Operator 展开的,先来看下 Operator 是怎么构建的。以 main.rs 为例,首先我们创建了一个基于文件系统的 Backend Builder;随后将其构建为 accessor(实现了 Accessor trait); 我们又将该 accessor 传入了 OperatorBuilder::new,最后调用了 finish

OpenDAL 通过 Accessor trait 统一了不同存储后端(Backend)的行为,并向上层暴露统一的 IO 接口,例如 create_dir, read, write 等。

rust
use opendal::services::Fs;
use opendal::Operator;

#[tokio::main]
async fn main() -> Result<()> {
    // Create fs backend builder.
    let mut builder = Fs::default();
    // Set the root for fs, all operations will happen under this root.
    //
    // NOTE: the root must be absolute path.
    builder.root("/tmp");

    let accessor = builder.build()?;
    let op: Operator = OperatorBuilder::new(accessor)?.finish();

    Ok(())
}

OperatorBuilder::new 发生了什么

我们传入的 accessor 在调用 new 时,被追加了两层 Layer,并在调用 finish 时,又被追加了一层内部 Layer。追加 Layer 后,当我们调用 Operator 暴露出来的接口时,调用会从最外层 CompleteLayer 开始,并最终抵达最内层 FsAccessor

go
FsAccessor
ErrorContextLayer
CompleteLayer
^
|
| Invoking (`read`, `reader_with`, `stat`...)
rust
impl<A: Accessor> OperatorBuilder<A> {
    /// Create a new operator builder.
    #[allow(clippy::new_ret_no_self)]
    pub fn new(accessor: A) -> OperatorBuilder<impl Accessor> {
        // Make sure error context layer has been attached.
        OperatorBuilder { accessor }
            .layer(ErrorContextLayer)
            .layer(CompleteLayer)
    }

    ...
    
    /// Finish the building to construct an Operator.
    pub fn finish(self) -> Operator {
        let ob = self.layer(TypeEraseLayer);
        Operator::from_inner(Arc::new(ob.accessor) as FusedAccessor)
    }
}

TL;DR 说了半天其实想强调一下,代码应该从 CompleteLayer 开始读(顿悟

背景知识补充

这里我们补充一些必要的上下文信息,以便理解后文内容。

LruCacheLayer

目前,在查询场景,我们追加了一层 LruCacheLayer,那么我们 Operator 就如下图所示:

rust
S3Accessor                FsAccessor
ErrorContextLayer         ErrorContextLayer
CompleteLayer             CompleteLayer
    ^                         ^  |
    |                         |  |
    |`inner`           `cache`|  |
    |                         |  |
    |                         |  |
    |                         |  |
    +----- LruCacheLayer -----+  |
                 ^               |
                 |               |
                 |               |
                 |               v
                 |               FileReader::new(oio::TokioReader<tokio::fs::File>)
                 |
                 Invoking(`reader`, `reader_with`)

read 接口为例,LruCacheLayer 会将 S3 的文件缓存到文件系统中, 并向上层返回缓存的基于文件系统的 Box<dyn oio::Read> (FileReader::new(oio::TokioReader<tokio::fs::File>));当然如果读取的文件不存在于缓存时,则先全量从 S3 加载文件至本地的文件系统中。

rust
struct LruCacheLayer {
  inner: Operator, // S3Backend
  cache: Operator, // FsBackend
  index: CacheIndex
}

impl LayeredAccessor for LruCacheLayer {
  ...
  async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
        if self.index.hit(path, args) {
          // Returns `Box<dyn oio::Read>`
          self.cache.read(path, args).await 
        } else {
          // Fetches cache and stores...
        }
  }
  ...
}

Copy From 的场景

Copy From 场景,我并没有加这一层 LruCacheLayer。那么我们 Operator 就如下图所示:

plain
S3Accessor
ErrorContextLayer
CompleteLayer
   ▲    │
   │    │
   │    │
   │    ▼
   │    RangeReader::new(IncomingAsyncBody)

   Invoking (`reader`, `reader_with`)

在使用 RangeReader 时遇到的问题

从构建 ParquetRecordBatchStream 说起

Copy From 中,我们拿到文件信息后,首先会调用 operator.reader 返回一个实现 AsyncReader + AsyncSeekreader,再套一层 BufReader;最终将该 reader 传入至 ParquetRecordBatchStreamBuilder 中。

这里面 BufReader 也是多此一举,BufReader 每一次 seek 后都会清空内部缓存区,所以其实没有获得任何性能上的收益。

rust
  ...
  let reader = operator
      .reader(path)
      .await
      .context(error::ReadObjectSnafu { path })?;

  let buf_reader = BufReader::new(reader.compat());

  let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
      .await
      .context(error::ReadParquetSnafu)?;

  let upstream = builder
      .build()
      .context(error::BuildParquetRecordBatchStreamSnafu)?;
  
  ...

ParquetRecordBatchStream::new 读取元信息

读取元信息逻辑如下,首先调用 seek(SeekFrom::End(-FOOTER_SIZE_I64)) ,读取 FOOTER_SIZE 字节后解析出 metadata_len;随后再一次调用 seek,并读取 metadata_len 字节后解析出元信息。

rust
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
        const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
        async move {
            self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;

            let mut buf = [0_u8; FOOTER_SIZE];
            self.read_exact(&mut buf).await?;

            let metadata_len = decode_footer(&buf)?;
            self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
                .await?;

            let mut buf = Vec::with_capacity(metadata_len);
            self.take(metadata_len as _).read_to_end(&mut buf).await?;

            Ok(Arc::new(decode_metadata(&buf)?))
        }
        .boxed()
    }
}

真正的问题

到上面为止,都是一些小问题。真正比较棘手的问题发生在这里,这里变量 stream 就是我们上面构建的 ParquetRecordBatchStream,当我们调用 next 时,ParquetRecordBatchStream 会调用多次 readerRangeReader)的 seekread。然而每次调用 seek 都会重置 RangeReader 的内部状态(丢弃掉之前的字节流),并在下次调用 read 时,重新发起一个远程请求(后端为 S3 的场景)(相关请参考:issue讨论)。

ParquetRecordBatchStream 在取回每列数据时:会先调用 RangeReader seek ,随后调用 read 读取一些字节。那么总共需要发起的远程调用次数为 RowGroup 数 乘上 RowGroup 内列的数。我们 800KiB 包含了 50 个 RowGroup 和 12 列,也就是发起了 600 次 S3 get 请求!

rust
        pub async fn copy_table_from(
    ...
            while let Some(r) = stream.next().await {
                let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
                let vectors =
                    Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;

                pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();

                let columns_values = fields
                    .iter()
                    .cloned()
                    .zip(vectors)
                    .collect::<HashMap<_, _>>();

                pending.push(self.inserter.handle_table_insert(
                    InsertRequest {
                        catalog_name: req.catalog_name.to_string(),
                        schema_name: req.schema_name.to_string(),
                        table_name: req.table_name.to_string(),
                        columns_values,
                    },
                    query_ctx.clone(),
                ));

                if pending_mem_size as u64 >= pending_mem_threshold {
                    rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
                }
            }
    
    ...

读一读 RangeReader 的源码

看看 self.poll_read()

RangeReaderself.state 初始值为 State::Idle,首先我们假设 self.offsetSome(0);随后 self.state 被设置为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>), 并再次调用 self.poll_read(cx, buf)

rust
impl<A, R> oio::Read for RangeReader<A, R>
where
    A: Accessor<Reader = R>,
    R: oio::Read,
{
    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
        ...
        match &mut self.state {
            State::Idle => {
                self.state = if self.offset.is_none() {
                    // Offset is none means we are doing tailing reading.
                    // we should stat first to get the correct offset.
                    State::SendStat(self.stat_future())
                } else {
                    State::SendRead(self.read_future())
                };

                self.poll_read(cx, buf)
            }
            ...
        }
    }
}

self.read_future() 发生了什么

显而易见,self.read_future() 返回了一个 BoxedFuture;在 BoxedFuture 中调用底层的 Accessorread 接口( acc.read(&path, op).await )。Accessor 可以是 S3 的存储后端实现,也可以是 OSS 实现等;在我们场景中,这个 Accessor 是 S3 存储后端,那么当它的 read 接口被调用时,会建立取回文件的 TCP 连接,将来自 S3 的响应以字节流的形式返回给上层。

rust
impl<A, R> RangeReader<A, R>
where
    A: Accessor<Reader = R>,
    R: oio::Read,
{
    fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
        let acc = self.acc.clone();
        let path = self.path.clone();

        let mut op = self.op.clone();
        // cur != 0 means we have read some data out, we should convert
        // the op into deterministic to avoid ETag changes.
        if self.cur != 0 {
            op = op.into_deterministic();
        }
        // Alter OpRead with correct calculated range.
        op = op.with_range(self.calculate_range());

        Box::pin(async move { acc.read(&path, op).await })
    }

    ...
}

书接上文 self.poll_read()

到此为止,poll_read 还没有返回;在上文中 self.poll_read() 被再次调用,此时 self.stateState::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)。这里的 ready!(Pin::new(fut).poll(cx)) 返回值就是上文中 acc.read(&path, op).await 调用的返回值。(对于 S3 存储后端,远程调用发生在这里)随后内部状态 self.poll_read 被设置为 State::Read(r),并再次调用 self.poll_read()。再次进入 self.poll_read 时,RangeReader 内部状态被设置为 State::Read(R)。这里的 R(r) 便是读取请求响应的字节流,对于 S3 存储后端,(Pin::new(r).poll_read(cx, buf) 将 TCP 缓冲区的字节数据写入到上层应用中。

rust
impl<A, R> oio::Read for RangeReader<A, R>
where
    A: Accessor<Reader = R>,
    R: oio::Read,
{
    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
        // Sanity check for normal cases.
        if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
            return Poll::Ready(Ok(0));
        }

        match &mut self.state {
            ...
            State::SendRead(fut) => {
                let (rp, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
                    // If read future returns an error, we should reset
                    // state to Idle so that we can retry it.
                    self.state = State::Idle;
                    err
                })?;

                // Set size if read returns size hint.
                if let Some(size) = rp.size() {
                    if size != 0 && self.size.is_none() {
                        self.size = Some(size + self.cur);
                    }
                }
                self.state = State::Read(r);
                self.poll_read(cx, buf)
            }
            State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
                Ok(0) => {
                    // Reset state to Idle after all data has been consumed.
                    self.state = State::Idle;
                    Poll::Ready(Ok(0))
                }
                Ok(n) => {
                    self.cur += n as u64;
                    Poll::Ready(Ok(n))
                }
                Err(e) => {
                    self.state = State::Idle;
                    Poll::Ready(Err(e))
                }
            },
        }
    }
}

最后看下 self.poll_seek()

还记得刚才我们 RangeReader 内部状态吗?没错,是 State::Reader(R)。如果我们在 read 之后在调用 seekRangeReader 内部的字节流会被丢弃,状态重新设置为 State::Idle。也就是说,在每次 seek 调用后再次调用 readRangeReader 便会请求底层 Accessor 的 read 接口 (acc.read(&path, op).await) 发起一个远程调用,返回一个包含 [Pos, size)Reader;对于 S3 存储后端,调用这个接口的开销是非常昂贵的(TTFB 通常高达百毫秒)。

另外还有一个性能相关的重点,当我们尝试 SeekFrom::End() 的时,且 self.size 未知时,会有一次额外的 stat 操作。self.poll_seek() 调用后 self.cur 会被设置为 base.checked_add(amt)

总结

  • 我们通过一个快速修复将导入文件的 RowGroup 数量从 50 改为 1,尽管如此,目前还需要 12 次远程调用。后续我们会为上游 OpenDAL 贡献一个 BufferReader(详见 RFC[5]),尽可能地避免连续调用 RangeReaderseek read 后带来的远程调用(特定场景下可以完全避免)。

  • OpenDAL 调用 seek 后会重置内部状态,下一次调用 read 会有一次远程调用请求(后端为 S3 的场景)。(相关请参考 issue[4] 和讨论[5])

  • std::io::BufReadertokio::io::BufReader 都会在 seek 后清除内部 Buffer,如果希望继续读 Buffer 内的内容,应该调用 seek_relative

关于 Greptime

Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。

欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~

Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb

官网:https://greptime.cn/

文档:https://docs.greptime.cn/

Twitter: https://twitter.com/Greptime

Slack: https://greptime.com/slack

LinkedIn: https://www.linkedin.com/company/greptime/

加入我们的社区

获取 Greptime 最新更新,并与其他用户讨论。