`for await` and the battle of buffered streams


Last week, Niko Matsakis pointed out a connection to me between for await and something we’ve taken to calling the “buffered streams problem”. This was inconvenient, because I was in the middle of writing a different blog post about what I want Async Rust to look like in a few years, and it challenged some of my assumptions.

for await in this post is really just shorthand for the most common way of iterating over async streams today. It iterates over the items yielded by the stream one at a time by polling the stream, then executing the body of the for loop. The following two loops are equivalent:

for await item in stream { /* process item */ }

while let Some(item) = stream.next().await { /* process item */ }

As it turns out, this can interact badly with some properties of other primitives defined on streams in the futures crate.

Managing concurrent queries

To see the problem, let’s start with some simple async code. Pretend you’re writing a simple batch job that queries a database for some work items, does further queries for each item, and then uploads the result.

async fn batch_job(db: &Database) {
    let work = run_query(db, FIND_WORK_QUERY).await;
    for item in work {
        let result = run_query(db, work_query(item)).await;
        upload_result(result).await;
    }
}

async fn run_query(db: &Database, query: Query) -> Vec<Row> {
    let conn = db.connect().await;
    conn.select_query(query).await
}

At some point you notice the job is spending a lot of time waiting on these individual queries from the database, but the database is capable of handling a higher load by processing queries in parallel.

You start issuing a few of them concurrently with the buffered combinator, which takes a stream of futures, executes up to N at a time, and returns a stream of their results. upload_result talks to a different service with less capacity, so we serialize that step for now.

async fn batch_job(db: &Database) {
    let work = run_query(db, FIND_WORK_QUERY).await;
    let work_queries = stream::iter(work)
        .map(|item| run_query(db, work_query(item)))
        .buffered(5);
    for await result in work_queries {
        upload_result(result).await;
    }
}

This code looks nice, and it expresses exactly what we want to do. Composing behaviors like this is exactly what we want from an expressive, high-level concurrency API.

But since you read the beginning of this post, you already know there’s a problem lurking somewhere.

The problem strikes

You happily start running your job in production. One day, you see reports of timeouts inside run_query. These reports indicate that the connection timed out between when it was created and when the query was sent.

Huh? There’s not much happening inside run_query:

async fn run_query(db: &Database, query: Query) -> Vec<Row> {
    let conn = db.connect().await;
    conn.select_query(query).await
}

You do know there are some very big jobs, for which upload_result takes 20-30 seconds. But this shouldn’t be a problem, because the function never blocks the thread. And yet…

Does knowing all of this help you spot the problem in this code?

I think it will still elude many people.

What happened?

The problem lies in the flow of execution through our loop. Let’s walk it through.

Since async execution in Rust is poll-driven, nothing happens inside work_queries until we start asking for items. At the beginning of the loop, we await the first result, and start cranking away on those queries:

async fn batch_job(db: &Database) {
    let work = run_query(db, FIND_WORK_QUERY).await;
    let work_queries = stream::iter(work)
        .map(|item| run_query(db, work_query(item)))
        .buffered(5);              // <-- 5 active; polling
    for await result in work_queries {  // <-- YOU ARE HERE
        upload_result(result).await;
    }
}

Our trusty buffered combinator spins up five run_query calls, and we wait here – until the first one finishes. Then execution moves on to the body of the loop.

async fn batch_job(db: &Database) {
    let work = run_query(db, FIND_WORK_QUERY).await;
    let work_queries = stream::iter(work)
        .map(|item| run_query(db, work_query(item)))
        .buffered(5);             // <-- 4 active; paused
    for await result in work_queries {
        upload_result(result).await;  // <-- YOU ARE HERE
    }
}

Notice what happened to our buffered queue of run_query operations: we stopped polling all four remaining active query operations. That means we have to wait until the loop body completes to poll them again, which could be 20-30 seconds for the slowest calls to upload_result. It’s easy for one of our connections to time out here if it isn’t polled promptly after being accepted by the server.

Why did it happen?

Async programming is all about managing concurrent control flow, which this code appears to do elegantly with a mix of language and library primitives. But the result was a subtle and surprising bug. This is not the kind of experience that Rust aims to provide its users.

The problem is that concurrent control flow ends up alternating between one set of sub-tasks (the queries) and another (the upload). It’s as if we had two big cranks and had to run between them, frantically spinning one crank before dashing off to spin the other crank before it stopped.

What we really want is for there to be one crank that drives all the work we happen to be doing right now. This is the job of an executor, which takes an arbitrary number of spawned tasks and their sub-tasks and drives them concurrently. It’s often possible to solve this kind of problem by spawning our queries directly.

In this case (based on a true story), spawning would actually be difficult because spawned tasks must be 'static, and run_query takes a database handle by reference. Let’s assume for now, though, that the fix was straightforward.

Which piece of code do you think was at fault in causing this bug? Was it the responsibility of

  1. run_query for relying on the caller to poll it frequently enough,
    Fix: Spawn a task directly in run_query.
  2. batch_job, for not recognizing the interaction between these combinators and await points,
    Fix: Spawn tasks for each query inside of batch_job.
  3. buffered, for not warning users that this pattern could lead to unexpected behavior,
    Fix: Update documentation.

Or some combination of the above? I don’t find any of these answers satisfying.

(1) is the safest choice. But it’s inefficient in cases where the function is directly awaited, doesn’t support borrowing, and doesn’t work at all in embedded.

A seasoned async Rust expert might answer (2). But I can’t imagine what advice I would give a newcomer to avoid this situation. Perhaps “call spawn as often as you can,” which is really a band-aid and not even good advice, given its lifetime restrictions.

It is possible to build a mental model of these two cranks being turned, but it’s subtle and easy to miss. This feels like exactly the kind of thing Rust should be checking for us.

Another option is (3), blame the combinator. That one might have some merit.

The curse of the pseudo-executor

Inside of buffered is a FuturesOrdered, which is the enabler of this problematic behavior. FuturesOrdered and its sibling FuturesUnordered show up in a lot of stream combinators. They act a lot like executors embedded in your function. They let you add as many futures as you want, run them concurrently, and handle the results as they become ready.

The problem is that unlike a normal executor, “running” and “handling the results” are different operations that must be handled entirely separately by the caller. If these futures were spawned as tasks into an executor, they would instead become peers of the current task and execute concurrently with it.

This difference is why I call these types “pseudo-executors”. It’s what turns the natural alternation in our for await loop into an alternation between two operations that should be running concurrently.

There are at least two advantages to this behavior:

  1. Combinators can avoid interacting with the real executor or knowing about it at all.
  2. The Unordered variants don’t need to allocate any space to store the output. Because they are driven directly by the caller, they can return the next result to the caller as soon as that result is ready.

If you ask me, these advantages are not worth the cost of the footgun they create.

If we blame the combinator, though, it begs the question of which combinator should take its place. buffered is undoubtedly a very useful tool. One option is to replace it, and other combinators like it that make use of Futures[Un]Ordered, with combinators that spawn their sub-tasks onto a real executor.

This would work if we had some kind of Spawn trait standardized across the ecosystem. It would be especially helpful if this API allowed borrowing (while disallowing parallelism), like moro’s scoped_spawn. It’s even possible this can be made to work in embedded applications without an allocator, as long as you statically know the type and the maximum number of futures that need to be polled, as we do here.

If we can make this approach work everywhere, it seems like a good option. There’s a chance it might not work in the embedded case, though, and it relies on a lot of moving parts, like a Spawn trait. Is there another way?

for_each, the hero we deserve?

Another approach is to use internal iteration:

async fn batch_job(db: &Database) {
    let work = run_query(db, FIND_WORK_QUERY).await;
    let work_queries = stream::iter(work)
        .map(|item| run_query(db, work_query(item)))
        .buffered(5);
    work_queries.for_each(async |result| {
        upload_result(result).await;
    }).await;
}

If for_each were a method on Stream (or AsyncIterator) instead of StreamExt, it could be overridden on the type returned by buffered to ensure that all tasks were polled in the background while the stream was iterated.

However, to really gain the reliability benefit, this would have to be the only way to iterate over the stream. No for await, no stream.next().await.

Toward more reliable async primitives

Async Rust has a poll-based model, and this kind of subtle interaction is one consequence of that. This particular case only shows up in streams, which (like synchronous iterators) naturally involve alternating between computing the next element and processing that element.

The other big problem that arises in connection to the poll model is its vulnerability to unexpected cancellation, usually in futures. This is exacerbated by the behavior of the select combinator, as Yosh Wuyts has described.

Neither of these problems imply that async Rust’s poll model is inherently bad. What we need is a library of primitives that interact well with the implications of that model while behaving predictably. This requires more experimentation in the real world.

In the meantime, we should ship the language features that enable this experimentation. I’ll say more about this in the next post.