KubeCon 2025 · 遇见 Greptime: 统一管理指标、日志与链路追踪等数据,重塑可观测新体验。 👉🏻 购买早鸟票

18d:21h:33m:52s
Skip to content
On this page
技术
2023-1-12

看不见的控制流 — Rust 异步取消的几点思考

本文主要介绍了一个在 GreptimeDB 中遇到的一个关于异步取消 (async cancellation) 的“奇怪”问题。

The Problem

针对这个问题,我们首先描述一个简化的场景:在一个长时间运行的测试中存在元信息损坏的问题,有一个应该单调递增的序列号出现了重复。

序列号的更新逻辑非常简单:从一个原子变量中读取当前值,然后通过异步 I/O 方法 persist_number() 将新值写入文件里,最后更新这个原子变量。整个流程都是串行化的(file 是一个独占引用)。

rust
async fn update_metadata(file: &mut File, counter: AtomicU64) -> Result<()> {
    let next_number = counter.load(Ordering::Relaxed) + 1;
    persist_number(file, next_number).await?;
    counter.fetch_add(1, Ordering::Relaxed);
}

由于一些原因,我们在这里使用了 load 函数而非 fetch_add(虽然单就这里来说可以用 fetch_add,并且用了还不会引发这次的问题🤪)。当这个更新流程在中间出现错误时,我们不希望更新内存中的计数器。我们清楚如果 persist_number() 写入文件时失败就能够从 ? 提前返回,并且会提早结束执行来传播错误,所以编码的时候会注意这些问题。

但是到了 .await 这里事情就变得奇妙了起来,因为 async cancellation 带来了一个隐藏的控制流。

Async Cancellation

async task and runtime

如果这时候你已经猜到了到底是什么引发了这个问题,可以跳过这一章节。如果没有,就让我从一些伪代码开始解释在 await point 那里到底发生了什么,以及 runtime 是如何参与其中的。

  • poll_future

首先是 poll_future ,对应到 Futurepoll 方法。我们写的异步方法都会被转化成类似这样子的一个匿名的 Future 实现。

rust
fn poll_future() -> FutureOutput {
    match status_of_the_task {
        Ready(output) => {
            // the task is finished, and we have it output.
            // some logic
            return our_output;
        },
        Pending => {
            // it is not ready, we don't have the output.
            // thus we cannot make progress and need to wait
            return Pending;
        }
    }
}

async 块通常包含其他的异步方法,比如 update_metadatapersist_number。这里把 persist_number 称为 update_metadata 的子异步任务。每个 .await 都会被展开成类似 poll_future 的东西,等待子任务的结果并继续执行。在这个例子中就是等待 persist_number 的结果返回 Ready 再更新计数器,否则不更新。

  • runtime

第二段伪代码是一个简化的 runtime,它负责轮询 (poll) 异步任务直到它们完成(考虑到接下来的文章内容,“直到……完成”这种表述并不适合于所有情况)。在 GreptimeDB 中我们使用 tokio 作为 runtime。现在的异步 runtime 可能有很多特性和功能,其中最基础的就是轮询这些任务。

rust
fn runtime(&self) {
    loop {
        let future_tasks: Vec<Task> = self.get_tasks();
        for task in tasks {
            match task.poll_future(){
                Ready(output) => {
                    // this task is finished. wake it with the result
                    task.wake(output);
                },
                Pending => {
                    // this task needs some time to run. poll it later
                    self.poll_later(task);
                }
            }
        }
    }
}

通过结合上述两个简化的 future 和 runtime 模型,我们得到如下这个循环(真实的 runtime 非常复杂,这里为了内容集中省略了很多)。

rust
fn run() -> Output {
    loop {
        if let Ready(result) = task.poll() {
            return result;
        }
    }
}

需要强调的是,每个 .await 都代表着一个或者多个函数调用 (调用到 poll() 或者说是 poll_future() )。这就是标题中“隐藏的控制流”,以及 cancellation 发生的地方。

我们再看一段简单的程序来探测 runtime 的行为(可以直接在 playground 里面运行这段代码):

rust
use tokio::time::{sleep, Duration, timeout};

#[tokio::main]
async fn main() {
    let f = async {
        print(1).await;
        println!("1 is done");
        print(2).await;
        println!("2 is done");
        print(3).await;
        println!("3 is done");
    };

    if let Err(_) = timeout(Duration::from_millis(150), f).await {
        println!("timeout");
    }
    
    sleep(Duration::from_millis(300)).await;
    println!("exit")
}

async fn print(val: u32) {
    sleep(Duration::from_millis(100)).await;
    println!("val is {}", val);
}

只要花几分钟时间猜测一下上方代码的输出结果,如果和下面的一致,相信你已经知道问题出在哪里。

val is 1
1 is done
timeout
exit

之后的语句都因为超时而被 runtime 取消执行了。

这个问题其中的原理并不复杂,但是(对我来说)能够定位到它并不轻易。在把其他问题都排除掉之后我知道问题就发生在这里,在这个 .await 上。也许是太多次成功的异步函数调用麻痹了注意,亦或是我的心智模型中没有把这两点联系起来,联想到这点着实费了一番心思。

cancellation

目前为止的内容是问题复盘的标准流程,接下来,让我们来展开讨论一下 cancellation,它是与 runtime 的行为相关的。

虽然 Rust 中的很多 runtime 都有类似的行为,但是这不是一个必须的特性,比如这个自己写的 runtime 就不支持 cancellation。因为问题发生在 tokio 上,因此这里会以它为例,而其他的 runtime 也是类似的。在 tokio 中,可以使用 JoinHandle::abort() 来取消一个 task。task 结构中有一个“cancel marker bit”来跟踪一个任务是否被取消了。如果它发现一个 task 被取消了,就会停止执行这个 task。

rust
// If the task is running, we mark it as cancelled. The thread
// running the task will notice the cancelled bit when it
// stops polling and it will kill the task.
//
// The set_notified() call is not strictly necessary but it will
// in some cases let a wake_by_ref call return without having
// to perform a compare_exchange.
snapshot.set_notified();
snapshot.set_cancelled();

背后的逻辑也很简单,就是 runtime 放弃了继续轮询你的 task,就和 ? 差不多。某种程度上可能更棘手一点,因为我们不能像 Err 那样处理这个 cancellation。不过这代表我们需要考虑每一个 .await 都有可能随时被 cancel 掉吗?这也太麻烦了。

以本文这个 metadata 更新的情况为例,如果把 cancel 纳入考虑范围,我们需要检查文件是否和内存中的状态一致,如果不一致就要回滚持久化的改动等等。坏消息是,在某些方面答案是肯定的,runtime 可以对你的 future 做任何事情。不过好在大多数情况下它们都还是很遵守规矩的。

Current Solution

explicit detach

现在是否有手段能防止 task 被取消呢?在 tokio 中我们可以通过 drop JoinHandle 来 detach 一个任务到后台。一个 detached task 意味着没有前台的 handle 来控制这个任务,从某种意义上来说也就使得其他人不能在外面套一层 timeoutselect,从而间接地使它不会被取消执行。并且开头提到的问题就是通过这种方式解决的。

JoinHandle detaches the associated task when it is dropped, which means that there is no longer any handle to the task, and no way to join on it.

不过虽然有办法能够实现这个功能,是否像 glommio 一样有一个显式的 detach 方法,类似一个不返回 JoinHandlespawn 方法会更好。但这些都是琐碎的事情,一个 runtime 通常不会完全没有理由就取消一个 task,并且在大多数情况下都是出于用户的要求,只不过有时候可能没有注意到,就像 select 中的那些“未选中的分支”或者 tonic 中请求处理的逻辑那样。所以如果我们确定一个 task 是不能被取消的话,显式地 detach 可能能预防某些悲剧的发生。

Our solution

目前为止所有问题都清晰了,让我们开始修复这个 bug 吧!

首先,为什么我们的 future 会被取消呢?通过函数调用链路很容易就能发现整个处理过程都是在 tonic 的请求执行逻辑中就地执行的,而对于一个网络请求来说有一个超时行为是很常见的。解决方案也很简单,就是将服务器处理逻辑提交到另一个 runtime 中执行,从而防止它被取消。只需要几行代码就能完成。

rust
@@ -30,12 +40,24 @@ impl BatchHandler {
            }
            batch_resp.admins.push(admin_resp);

-        for db_req in batch_req.databases {
-            for obj_expr in db_req.exprs {
-                let object_resp = self.query_handler.do_query(obj_expr).await?;
-                db_resp.results.push(object_resp);
+        let (tx, rx) = oneshot::channel();
+        let query_handler = self.query_handler.clone();
+        let _ = self.runtime.spawn(async move {
+            // execute request in another runtime to prevent the execution from being cancelled unexpected by tonic runtime.
+            let mut result = vec![];
+            for db_req in batch_req.databases {
+                for obj_expr in db_req.exprs {
+                    let object_resp = query_handler.do_query(obj_expr).await;
+
+                    result.push(object_resp);
+                }
             }

这个问题到这里就修复完了,不过并不是从根本上解决 async cancellation 带来的 bug,而是采用间接手段去规避任务由于超时而被提前取消的问题,毕竟我们的这些异步逻辑还是需要被完整执行的。

但是这样的处理会放大另外一些问题,比如我们也无法提前取消掉对于已经不用执行或资源消耗特别大的任务,从而导致系统资源的浪费。这些是我们之后需要持续改进的地方。接下来会就这方面继续展开,从 async 生态的方面讨论有哪些可能能提升 async cancellation 的使用体验。

Runtime Behavior

  • marker trait

首先,我们自然希望 runtime 不要无条件地取消我的 task,而是尝试通过类型系统来变得更友好,比如借助类似 CancelSafe 的 marker trait 。对于 cancellation safety 这个词,tokio 在它的文档中有提到:

To determine whether your own methods are cancellation safe, look for the location of uses of .await. This is because when an asynchronous method is cancelled, that always happens at an .await. If your function behaves correctly even if it is restarted while waiting at an .await, then it is cancellation safe.

简单来说就是用来描述一个 task 是否可以安全地被取消掉,这是 async task 的属性之一。tokio 维护了一个很长的列表,列出了哪些是安全的以及哪些是不安全的。看起来这和 UnwindSafe 这个 marker trait 很像。两者都是描述“这种控制流程并不总是被预料到的”,并且“有可能导致一些微妙的 bug”的这样一种属性。

如果有这样一个 CancelSafe 的 trait,我们就有途径可以告诉 runtime 我们的异步任务是否可以安全地被取消掉,同时也是一种方式让用户承诺 cancelling 这个控制流程是被仔细处理过的。如果发现没有实现这个 trait,那就意味着我们不希望这个 task 被取消掉,简单而清晰。以 timeout() 为例:

rust
/// The marker trait
trait CancelSafe {}

/// Only cancellable task can be timeout-ed
pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F> where
    F: Future + CancelSafe
{}
  • volunteer cancel

另一个方式是让任务自愿地取消。就像 Kotlin 中的 cooperative cancellation 一样,它有一个 isActive 方法来检查一个 task 是否被取消掉。这只是一个检测方法,是否要取消完全取决于 task 本身。下面是 Kotlin 文档中的一个例子, cooperative cancellation 发生在第 5 行。这种方式把“隐藏的控制流程”放在了明面上,让我们能以一种更自然的方式来考虑和处理 cancellation,就像 OptionResult 一样。

Kotlin
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (isActive) { // cancellable computation loop
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

并且我认为这也不难实现,Tokio 现在已经有了 Cancelled bitCancellationToken,只是看起来和期望的还有点不一样。最后还是需要 runtime 把 cancellation 的权利交给 task,否则情况可能没有什么大的不同。

异步取消问题作为 Rust 语言中一个经典问题,暂时还没有很完美的解法。如果你正被这个问题困扰,或者有一些自己的见解,欢迎在评论区留言。期待随着社区生态的共同努力,很快能共建一个更优的解决方案~

关于 Greptime

Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。

欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~

Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb

官网:https://greptime.cn/

文档:https://docs.greptime.cn/

Twitter: https://twitter.com/Greptime

Slack: https://greptime.com/slack

LinkedIn: https://www.linkedin.com/company/greptime/

加入我们的社区

获取 Greptime 最新更新,并与其他用户讨论。