Modeling condition variables in Swift async/await
In the brave new world of Swift async/await, a problem I sometimes encounter is how to communicate between Task
s. More specifically, if I need one Task
to wait on a condition to be true as a result of work that’s done by another set of Task
s. In this post I’ll cover how I eventually ended up solving this using something like a condition variable.
Refining the problem scope
There’s more than one way to handle Task
synchronization, so I want to start by refining the problem that I’m trying to solve. The hope is it’ll be clearer why I chose the solution I did.
One of the easiest ways to have one Task
wait on another’s work is to package up that work in it’s own Task
then just await
that. Here’s a oversimplified example of a pattern that I’ve used:
final actor Thingy {
private let loadTask: Task<Void, Never>
/// User might call this at some point, repeatedly
func processData() async {
// Waiting on the task to ensure it's done before continuing
await loadTask.value
// Ok, now I can do my processing safe that we've loaded
}
}
In this example, loading the data can take a while and I want to make sure it’s loaded before doing any on-demand processing of it. I also don’t want to do the load repeatedly, so I wrap it up in Task
and set it as a property. Any Task
that needs load to be completed just await
s the value. (This is an extremely contrived example to demonstrate a pattern. Don’t email how you’d refactor it.) The point here is awaiting the result of a Task
is a nice, straight forward mechanism for synchronizing work between two tasks.
However, there’s a variation of this problem that I want to solve in this post. It’s not that a specific task has completed, but a condition has become true. That sounds subtle, but there’s actually a big difference. Multiple Task
s may be working and any one them might make the condition become true.
For example, suppose I want to limit the number of concurrent Task
s that are downloading images to be no more than 10. When a Task
starts to download an image it needs to know if there’s less than 10. If there are 10 or more, it wants to wait until one of the existing Task
has finished. This is where awaiting on a single Task
as a solution falls apart. The waiting Task
actually wants to know when the concurrency count goes below 10, not when a specific Task
completes (although they are correlated). It can’t await
all the executing Task
s because it doesn’t need to wait until all Task
s complete. If it awaits a single Task
, it also might wait longer than necessary if a different Task
from the one its await
ing completes first. Of course this is all ignoring that there could be many Task
s waiting for the concurrency count to go below 10, and how do I ensure only one of them continues when it drops to 9 concurrent?
(Also, I know the classic way to solve the image download concurrency limiting is to use OperationQueue
. But in a future post, I’m going to argue that’s not ideal in an async/await world for this problem.)
To summarize: I want to have a Task
to wait on a condition to become true, which isn’t the same as waiting on a Task
to complete.
Classic approach
If I was still in the bad old days of pthreads, I’d reach for a condition variable. I’d have the queued Task
wait on it, and when the concurrent count dropped below 10, the just completed download Task
would signal the condition variable. But I can’t actually use pthread condition variables because they’d block the underlying system thread, which would hang one of the thread resources used by Swift’s cooperative pool. Then everybody would just be sad.
However, what if I could construct something similar to a condition variable that stays in async/await land? It wouldn’t actually need to do everything a traditional pthread condition does, like make sure only one Task
unblocks because I could handle that manually.
Let me sketch out what that might look like:
struct Condition {
func wait() async {}
static func makeCondition() -> (Condition, Signal)
}
final class Signal {
func signal() {}
}
With this approach, the Task
that wants to download an image can call into the queue that knows how many downloads are currently going. The queue constructs a Condition/Signal
pair, keeps the Signal
and returns back the Condition
to the calling Task
. The calling Task
then wait()
s on that Condition
. When the queue decides it’s time for that specific Task
to go (it can keep a prioritized array of Signal
s), it calls signal()
on the Signal
and the paired Condition
unblocks.
This behavior would solve my stated problem: my Task
could wait on a condition to become true, instead of waiting on a specific Task
.
Implementation
I chose to use an AsyncStream
to build this functionality. Basically, the Condition.wait()
is going to sit in an async/await loop waiting on the AsyncStream
to complete. The Signal.signal()
calls finish()
on the AsyncStream
‘s continuation to unblock it.
/// Signal is used in conjunction with Condition. Together they allow
/// one Task to wait on anther Task.
public final class Signal {
private let stream: AsyncStream<Void>.Continuation
/// Private init, don't call directly. Instead, use Condition.makeCondition()
fileprivate init(stream: AsyncStream<Void>.Continuation) {
self.stream = stream
}
/// Signal the waiter (who has the Condition) that they're good to go
public func signal() {
stream.finish()
}
}
/// Condition allows two async Tasks to coordinate. Use `makeCondition()` to
/// create a Condition/Signal pair. The Task that wants to wait on something to
/// happen takes the Condition, the Task that notifies of the condition takes
/// the Signal.
public struct Condition {
private let waiter: () async -> Void
/// Private init; create a closure that will can be waited on
fileprivate init(waiter: @escaping () async -> Void) {
self.waiter = waiter
}
/// Wait on the condition to become true
public func wait() async {
await waiter()
}
/// Construct a Condition/Signal pair. The Task that wants to wait on something to
/// happen takes the Condition, the Task that notifies of the condition takes
/// the Signal.
public static func makeCondition() -> (Condition, Signal) {
let (stream, continuation) = AsyncStream<Void>.makeStream()
let condition = Condition {
for await _ in stream {}
}
let signal = Signal(stream: continuation)
return (condition, signal)
}
}
Conclusion
In this post I described a variation of a Task
synchronization problem. In this variation a Task
wants to wait on a condition to become true, as opposed to a Task
being completed. I then introduced simplified version of a traditional synchronization mechanism called a condition variable as a mechanism for solving this problem. Finally, I demonstrated a working async/await solution using AsyncStream
.