Archive for April, 2024

Reading and writing files in Swift async/await

Because of habits ingrained in me, by default I tend to reach for synchronous, blocking APIs when reading and writing data to and from disk. This causes problems with Swift’s cooperatively scheduled Tasks. In this post, I examine the various async-safe approaches I’ve discovered to hitting disk, and end with a general approach that I ended up using.

The synchronous way

If I’m loading up data from disk to decode from JSON or decode into a UIImage, then my go to has been:


let data = try Data(contentsOf: fileURL)

Similarly, if I’m writing said JSON or image data out to disk, I’ll use write:


try data.write(to: fileURL)

These are simple and easy to use. They also create problems with Swift async/await.

When I look at the Swift Foundation implementation of these methods, I can see by default they do blocking I/O operations. This is a problem because Tasks are cooperatively scheduled over kernel threads. (I know this is probably an over-simplification, but stick with me.) If a Task traps to the kernel on a blocking call (like the blocking I/O calls), the kernel doesn’t know anything about the Task it only sees the system thread blocking, and therefore suspends it. This robs Swift’s async runtime from the opportunity to just suspend the Task making the I/O call and pick up a different Task that is available to run. i.e. this takes away one of the system threads Swift async/await can use for the duration of the blocking I/O call.

So while simple, it would probably be best if I didn’t use them in the async/await world.

Option 1: side step the issue via memory mapping

If all I care about is reading in data, I can consider asking Data to memory map the file. Depending on the access patterns of the end user, this might actually be the fastest way anyway. The read accesses become implicit whenever someone touches the contents of Data and the kernel faults in the appropriate pages.


let data = try Data(contentsOf: fileURL, options: .mappedIfSafe)

A caveat is not all files can be memory mapped. There’s also no automatic way to use memory mapping for writes using Data APIs. While I could roll my own, I’m not sure what the benefit of that would be.

Option 2: use URLSession

The next option I discovered was leveraging URLSession to do asynchronous reads from file URLs.

Like this:


let (data, _) = try await URLSession.shared.data(from: fileURL)

For reading the entire file in one go, this works well. Unfortunately, I couldn’t find a way to “upload” to a file URL. Probably for good reason. So this is a read-only solution.

Quick aside about async URLSession: you probably don’t want to use the bytes method.

Option 3: wrap up FileHandle

This is the first solution I found that handles both asynchronous reads and writes. The basic principle is simple: create a FileHandle then use the readabilityHandler or writeabilityHandler properties to either read data out or write data into it. For reading, I returned an AsyncStream<Data> that the readabilityHandler yields to.

Side note: I do not recommend using the bytes property to iterate one byte at a time.

I got really far with this approach and had it working. However, for files, I found it clunky. Namely:

  1. When reading, I had to get the size of the file first so I knew when the readabilityHandler got called the last time. A lot of sample code assumes that the availableData will be isEmpty on the last call. However, in my testing, I found that was true for Pipes but not for files.
  2. When writing, the call to write was still synchronous. Since I was doing it inside the writeabilityHandler, presumably it wouldn’t actually block but that’s not really clear.

After getting FileHandle to work, I decided it was too clunky and went for a different approach. But it is an option.

Option 4: DispatchIO

This is the approach I ended up using. DispatchIO is low-level, but its API was consistent and straight forward to wrap up. The basic principle is to use DispatchIO to get non-blocking reads/writes and then wrap those in checked continuations. That way Swift async/await knows when to suspend or resume the calling Task. Below I go into detail on my implementation. As an added bonus to the async/await, this is my first go at a non-copyable type.

I’ll start with the type declarations:


/// A phantom type used by AsyncFileStream to restrict methods to read mode
public enum ReadMode {}
/// A phantom type used by AsyncFileStream to restrict methods to write mode
public enum WriteMode {}

public struct AsyncFileStream<Mode>: ~Copyable {
    // ...
}

The actual type of AsyncFileStream is complicated by the fact that I’m trying to make invalid operations impossible for the caller. I achieve this two ways:

  1. I use a phantom type called Mode that can either be ReadMode or WriteMode. Read methods are only available when Mode == ReadMode and write methods only when Mode == WriteMode
  2. I mark AsyncFileStream as non-copyable with ~Copyable. This means there can be only one copy, preventing multiple Tasks/threads/whatever from calling it at the same time. It also means I get a nice deinit method to clean up in, and I can mark my close() method as consuming. This means the instance can’t be used after close() is called, or the compiler at compile time will emit an error.

AsyncFileStream has the properties you’d probably expect if you’ve used DispatchIO before:


/// The queue to run the operations on
private let queue: DispatchQueue
/// The unix file descriptor for the open file
private let fileDescriptor: Int32
/// The DispatchIO instance used to issue operations
private let io: DispatchIO
/// If the file is open or not; used to prevent double closes()
private var isClosed = false

If you haven’t used DispatchIO before, don’t worry, these properties will make sense when I use them.

Creating an instance is actually the most involved part of this whole type, but it’s a good place to start:


fileprivate init(url: URL, mode: Int32) throws {
    guard url.isFileURL else {
        throw AsyncFileStreamError.notFileURL
    }
    // Since we're reading/writing as a stream, keep it a serial queue
    let queue = DispatchQueue(label: "AsyncFileStream")
    let fileDescriptor = open(url.absoluteURL.path, mode, 0o666)
    // Once we start setting properties, we can't throw. So check to see if
    //  we need to throw now, then set properties
    if fileDescriptor == -1 {
        throw AsyncFileStreamError.openError(errno)
    }
    self.queue = queue
    self.fileDescriptor = fileDescriptor
    io = DispatchIO(
        type: .stream,
        fileDescriptor: fileDescriptor,
        queue: queue,
        cleanupHandler: { [fileDescriptor] error in
            // Unfortunately, we can't seem to do anything with `error`.
            // There are no guarantees when this closure is invoked, so
            //  the safe thing would be to save the error in an actor
            //  that the AsyncFileStream holds. That would allow the caller
            //  to check for it, or the read()/write() methods to check
            //  for it as well. Howevever, having an actor as a property
            //  on a non-copyable type appears to uncover a compiler bug.

            // Since we opened the file, we need to close it
            Darwin.close(fileDescriptor)
        }
    )
}

First, notice the init is fileprivate. I’m going to provide much more ergonomic APIs for creating an instance than knowing how to fill out mode correctly. But there’s a lot of shared code, so this init takes care of that. Error handling in the init of a non-copyable type is tricky. I can only throw errors up to the point that it starts setting up stored properties. Once that starts the init must guarantee success (i.e. no throws after) or its a compile time error. So early on I verify I have a file URL and that the open happened successfully.

Otherwise, I set up DispatchIO. I create my DispatchQueue to execute the IO operations on. Since I’m going to be reading and writing as a stream, I might as well keep a serial queue. I open the file using the POSIX open() API, using the passed in mode, and set the permissions to 0o666, which should give read/write to all. As mentioned before, I create DispatchIO as a .stream since I’m going to sequentially read through the file or write it out.

The cleanupHandler is another tricky bit. It’s called once DispatchIO is done with our fileDescriptor. e.g. after someone’s called close(). Since we opened the file, we take care of closing it. I’m also given an error as a parameter to my closure; if it’s non-zero something went wrong. Unfortunately, I ran into a compiler bug (according to the compiler itself) in trying to handle it.

A slight diversion about said compiler bug

What I’d like to do to handle the cleanupHandler error is to have a private actor like this:


final actor AsyncError {
    private(set) var error: AsyncFileStreamError?

    func setError(_ error: AsyncFileStreamError) {
        self.error = error
    }
}

I always create an instance in the init and have it as a property on AsyncFileStream. Inside of my cleanupHandler I could set the actual error on it:


cleanupHandler: { [fileDescriptor] error in
    if error != 0 {
        Task {
            await asyncError.setError(error)
        }
    }
    // Since we opened the file, we need to close it
    Darwin.close(fileDescriptor)
}

Since it’s a property, the read and write methods could check it in a guard statement or the caller could explicitly read it off to check for an error. Unfortunately, when I actually implement this, attempting to touch the actor instance leads to:

Usage of a noncopyable type that compiler can't verify. This is a compiler bug. Please file a bug with a small example of the bug

Welp.

Moving on…

Creating an instance

I wanted to be able to create an instance of an AsyncFileStream without remembering all the various mode flags for open(). Also, I added the Mode phantom type, but I’d rather that be invisible to the caller. i.e. they don’t have to type out the Mode explicitly somewhere. If I use a direct init though, they would, as there’s no way to infer it. For those reasons, I added an extension to URL to create AsyncFileStream:


public extension URL {
    /// Create an instance from the URL for reading only
    func openForReading() throws -> AsyncFileStream<ReadMode> {
        try AsyncFileStream<ReadMode>(url: self, mode: O_RDONLY)
    }

    /// Create an instance from the URL for writing. It will overwrite if the file
    /// already exists or create it if it does not exist.
    func openForWriting() throws -> AsyncFileStream<WriteMode> {
        try AsyncFileStream<WriteMode>(url: self, mode: O_WRONLY | O_TRUNC | O_CREAT)
    }
}

If I have a URL, I just have to call openForReading() or openForWriting(). Does what it says on the tin.

Cleaning up safely

Next, I’ll cover the parts of the type that are available regardless of Mode, namely closing the file. Because this is a non-copyable type, close() gets interesting (in a good way):


/// Close the file. Consuming method
public consuming func close() {
    isClosed = true
    io.close()
}

The body is a bit boring — it marks itself as closed so deinit doesn’t close again, and then closes the DispatchIO object. The fact that this is marked consuming is interesting though. Specifically, in that position, it means self is consumed. Which means the caller can’t call any methods after it without getting a compiler error. For example:


let stream: AsyncFileStrea<ReadMode> // ... assume initialized
stream.close() // stream is now consumed

stream.readToEnd() // this is a compiler error!

That’s pretty neat! I’ve prevented an illegal operation a compile time.

To circle back to marking isClosed to true so deinit doesn’t try to close it. In a consuming method, I could — if the type is right — call discard self. That destroys self and means deinit doesn’t get called. Unfortunately, it only works if the type only contains “trivial” types, and AsyncFileStream contains non-trivial types, apparently. From what I can discern “trivial” means you can do a bit-by-bit copy on the type, no reference counting. In any case, since I can’t discard self, we need to mark something so deinit knows not to try to call ios.close() a second time.

Speaking of deinit:


deinit {
    // Ensure we've closed the file if we're going out of scope
    if !isClosed {
        io.close()
    }
}

Since I’ve got a non-copyable type, I get a deinit, which is just the bee’s knees. If I haven’t closed the file already, I do so now so I don’t leak anything.

Reading

If I create the type in ReadMode, there are a couple of read methods available.


public extension AsyncFileStream where Mode == ReadMode {
    /// Read the entire contents of the file in one go
    func readToEnd() async throws -> DispatchData { ... }

    /// Read the next `length` bytes.
    func read(upToCount length: Int) async throws -> DispatchData { ... }
}

Like marking close() as consuming, the goal of putting the methods in a conditional extension is to prevent misuse. Unless I open AsyncFileStream in ReadMode, I simply can’t call any read methods at compile time because they aren’t available.

As mentioned at the start of this section, the basic idea of read is to wrap DispatchIO’s read in a continuation:


/// Read the next `length` bytes.
func read(upToCount length: Int) async throws -> DispatchData {
    try await withCheckedThrowingContinuation { continuation in
        var readData = DispatchData.empty
        io.read(offset: 0, length: length, queue: queue) { done, data, error in
            if let data {
                readData.append(data)
            }
            guard done else {
                return // not done yet
            }
            if error != 0 {
                continuation.resume(throwing: AsyncFileStreamError.readError(error))
            } else {
                continuation.resume(returning: readData)
            }
        }
    }
}

This is straight forward: wrap up io.read() in checked throwing continuation. For my purposes I built up a single DispatchData and returned it when finished. That’s because I want to use the entire contents as a single Data instance. However, I could have returned an AsyncStream<DispatchData> and streamed out the data as I got it in the callback. I’m also going to note DispatchIO explicitly marks the read as done with a done flag. Glaring at you, FileHandle.

Also note that since I’m using the stream mode of DispatchIO, so offset is ignored, which is why its always 0.

Since my goal was to read the entire file at once, I added a helper method to do that:


/// Read the entire contents of the file in one go
func readToEnd() async throws -> DispatchData {
    try await read(upToCount: .max)
}

It just calls through to the normal read().

Writing

Like reading, writing is gated based on the Mode. Writing also works the same as reading: I wrap up a non-blocking call with a callback in a checked throwing continuation. Ugly, but straight forward and effective (like me!):


public extension AsyncFileStream where Mode == WriteMode {
    /// Write the data out to file async
    func write(_ data: DispatchData) async throws {
        try await withCheckedThrowingContinuation { continuation in
            io.write(
                offset: 0,
                data: data,
                queue: queue
            ) { done, _, error in
                guard done else {
                    return // not done yet
                }
                if error != 0 {
                    continuation.resume(throwing: AsyncFileStreamError.writeError(error))
                } else {
                    continuation.resume(returning: ())
                }
            }
        } as Void
    }
}

The main difference from the structure of the read is I ignore the second parameter in the callback, which is the data remaining to write out.

Some Data convenience

Finally, I get back to where I started. Namely, I want convenience methods on Data that read and write entire files, but asynchronously. All I do is wrap up the methods I just defined:


public extension Data {
    /// Asynchronously read from the contents of the fileURL. This method
    /// will throw an error if it's not a file URL.
    init(asyncContentsOf url: URL) async throws {
        let stream = try url.openForReading()
        self = try await Data(stream.readToEnd())
    }

    /// Asynchronously write the contents of self into the fileURL.
    func asyncWrite(to url: URL) async throws {
        // This line makes me sad because we're copying the data. I'm not
        //  currently aware of a way to not copy these bytes.
        let dispatchData = withUnsafeBytes { DispatchData(bytes: $0) }
        let stream = try url.openForWriting()
        try await stream.write(dispatchData)
    }
}

The one thing of note is in the asyncWrite() method I make a copy of the bytes when I instantiate DispatchData. I would really like to not have to do that, but I couldn’t find a way. If you know a way, please tap on the "Contact" button at the top of this page and send me an email or message me on Mastodon.

And that’s it!

Conclusion

In this post, I started with a couple of very convenient methods on Data that make it easy to read or write entire files. However, I pointed out they’re synchronous, which can cause problems in Swift’s async/await world. I then walked through various async-safe ways or reading and writing files. Finally, I built out a general solution for async file I/O by wrapping up DispatchIO in checked continuations.

Swift async/await: do you even need a queue?

Alternative title: A concurrency limiter for Swift async/await

This post is really for me and my future self to help clarify my thinking about modeling asynchronous operations in Swift async/await. But I suppose you can read along as well.

In the world prior to async/await I used dispatch queues and OperationQueues to model asynchronous operations. As a result, my thinking about async naturally reverts to the hammer of execution queues. However, I’m starting to realize execution queues are sometimes not a good fit for modeling async operations in Swift async/await. In this post, I’m going to walk through an example where I found that an execution queue wasn’t what I wanted. Instead I wanted to simply limit the number of Tasks that could concurrently pass through a certain block of code. I’ll end with how I solved the limiter problem.

An example of my broken thinking

To return to my previous post’s example, let’s say I’m building an image cache. Part of the caching infrastructure is to actually download the image from the network. Since I can overload the network if I try to download too many images at once, I want to limit this part to ensure only so many download operations are happening concurrently.

The “classic” way to handle the limiting is to schedule the image download operation in an OperationQueue and make sure it has its maxConcurrentOperationCount set appropriately. Therefore, naturally, I turned to execution queues as a way to solve my concurrent downloads problem. I tried to build a Swift async/await “native” version of a concurrent execution queue. It got something that “mostly” worked, for some definition of “mostly”. And “work”. But what I couldn’t make work is a clean way to get the downloaded image back to the caller.

When I run into difficulties like this, I stop and try to figure out why it’s difficult. Usually it’s because there’s a mismatch between my mental models, and I’m trying to force a square peg into a round hole.

For my image cache, each fetch image operation is modeled as a Task. In pseudo-code something like this:


Task.detached {
    if let image = await memoryCache.image(for: url) {
        return image
    }
    if let image = await diskCache.image(for: url) {
        return image
    }

    downloadQueue.run {
        // This block actually runs in Task created by the Queue
        let image = await network.downloadImage(for: url)

        // TODO: how do I get `image` back to the calling Task?
    }

    //...
}

Things flow well until I need to actually download the image and want to limit how many concurrent ones via the downloadQueue. How do I get the image back to the calling Task? I mean, I guess I might figure out a way to make a callback work? That feels super janky, and possibly not safe.

However, looking at this code is when I first realized a couple of things. First, I was switching execution contexts to do the download. Why? I have a perfectly good detached Task already that I would prefer to be executing in. Tasks are supposed to be “recipes”, i.e. a list of steps to perform, which should include the actual download. So, I didn’t need the execution part of an execution queue. Second, what I actually needed it for was limiting the number of concurrent operations; that was its real purpose.

Therefore, my actual problem is I need a way to limit how many concurrent Tasks are calling await network.downloadImage(for: url) at the same time. Can I solve that problem instead?

A concurrency limiter

I’m going to solve this problem of queues with a queue. Obviously. The difference is this queue won’t provide any execution. The calling Task will do that.

I like starting with what I’d like to have at the call site:


Task.detached {
    if let image = await memoryCache.image(for: url) {
        return image
    }
    if let image = await diskCache.image(for: url) {
        return image
    }

    let image = downloadLimiter.run {
        // This runs on the calling task. No extra tasks!
        await network.downloadImage(for: url)
    }

    //...
}

The key bit is the closure is passed to run is executed on the calling Task. That makes returning the image back trivial. A call site like the above example would imply an interface something like:


public final class ConcurrencyLimiter {
    /// Concurrency is the maximum number of concurrent blocks to allow to run
    public init(concurrency: Int) 

    /// Execute the given block on the calling Task. It may wait first if there
    /// are already the maximum blocks running.
    public func run<Value>(_ block: @escaping () async throws -> Value) async throws -> Value
}

It has an init so it can know the maximum number of concurrent blocks, and then just a run method that executes the passed in closure. There’s also a non-throwing variant of run but it’s a simplified version of the throwing one, so it’s less interesting.

I’ll start on the outside and work my way inside to the details of the implementation. Here’s how I built run:


public func run<Value>(_ block: @escaping () async throws -> Value) async throws -> Value {
    let condition = await counter.enterLimiting()
    await condition.wait()

    do {
        let value = try await block()
        await counter.exitLimiting()
        return value
    } catch {
        await counter.exitLimiting()
        throw error
    }
}

First things first: counter is an actor owned by the ConcurrencyLimiter. It does all the… counting… of how many tasks are running at one time. It’ll also decide which task goes next. So maybe I should have called it Scheduler? This is chaos that happens when you don’t have code reviews. smh.

run calls into counter to ask to enter the limited section. The counter returns a Condition which the calling Task immediately waits on. The Condition is described in my previous post, and is what allows Counter to control which Task goes next. Once the Task returns from the call to condition.wait() it executes its code. Before returning or throwing it calls back into counter on exitLimiting() to let it know this Task is done.

That’s it for run. Most of the heavy lifting is in Counter so I’ll look at that next.


final actor Counter {
    /// The maximum amount of currency allowed
    private let concurrency: Int
    /// How many blocks are currently in flight
    private var inflightCount = 0
    /// Pending (blocked) blocks in a FIFO queue.
    private var pending = [Signal]()

    init(concurrency: Int) {
        self.concurrency = concurrency
    }
}

There’s not much data needed by the scheduler. Um… I mean Counter. Counter. It has the maximum number of concurrent tasks allowed, how many are actually in flight right now, then an array of Tasks in priority order represented by the Signal that will unblock them. That’s it.

enterLimiting() is called by the Task when it wants to enter the limited section:


func enterLimiting() -> Condition {
    let shouldWait = inflightCount >= concurrency

    let (condition, signal) = Condition.makeCondition()
    if shouldWait {
        pending.append(signal)
    } else {
        // immediately signal and let it run
        inflightCount += 1
        signal.signal()
    }

    return condition
}

This method is in an actor, and there are no awaits in here, so this method happens without reentrancy. That’s important. It computes if the caller can immediately execute or it should wait. Either way, it creates a Condition/Signal pair so it can return the Condition. If the calling Task can execute immediately, it acts like it has already started by incrementing in the inflight count and pre-signaling (i.e. unblocking) the Condition. If the calling Task needs to wait, the Signal is appended to the end of the waiting queue.

The other end of the process is when the calling Task leaves the limited section.


func exitLimiting() {
    inflightCount -= 1
    let shouldUnblock = inflightCount < concurrency

    guard shouldUnblock, let firstPending = pending.first else {
        return
    }
    pending.removeFirst()
    inflightCount += 1
    firstPending.signal()
}

It immediately updates the inflight count and determines if it can unblock any of the pending Tasks. If it can and there are tasks waiting, it counts the Task as started and unblocks by signaling the Task‘s condition.

You might have noticed this is a simple FIFO queue for prioritization. However, really any prioritization scheme could be substituted in.

Finally, I put a little bit of “what if” insurance in the deinit of the actor. Basically, what happens if the Counter goes out of scope, but there are still Tasks blocked on it?


deinit {
    let localPending = pending
    pending.removeAll()
    for local in localPending {
        local.signal()
    }
}

I make a local copy because I’m just extra paranoid (probably don’t need that), then go through and unblock everything. Presumably no callers care anymore if we’re going out of scope, but I don’t want to leave any Tasks blocked.

(As an aside, I remember from my undergrad days this kind of data structure being called a “monitor.” But Wikipedia says I’m wrong.)

Conclusion

In this post, I described my learning journey where I realized that I didn’t need an execution queue like OperationQueue for my particular problem. Instead, I wanted to let each Task bring its own execution, and just needed a queue to limit the number of concurrent executions. Finally, I demonstrated a simple implementation of a ConcurrencyLimiter using the Condition/Signal pair I introduced in my previous post.

Modeling condition variables in Swift async/await

In the brave new world of Swift async/await, a problem I sometimes encounter is how to communicate between Tasks. More specifically, if I need one Task to wait on a condition to be true as a result of work that’s done by another set of Tasks. In this post I’ll cover how I eventually ended up solving this using something like a condition variable.

Refining the problem scope

There’s more than one way to handle Task synchronization, so I want to start by refining the problem that I’m trying to solve. The hope is it’ll be clearer why I chose the solution I did.

One of the easiest ways to have one Task wait on another’s work is to package up that work in it’s own Task then just await that. Here’s a oversimplified example of a pattern that I’ve used:


final actor Thingy {
    private let loadTask: Task<Void, Never>

    /// User might call this at some point, repeatedly
    func processData() async {
        // Waiting on the task to ensure it's done before continuing
        await loadTask.value

        // Ok, now I can do my processing safe that we've loaded
    }
}

In this example, loading the data can take a while and I want to make sure it’s loaded before doing any on-demand processing of it. I also don’t want to do the load repeatedly, so I wrap it up in Task and set it as a property. Any Task that needs load to be completed just awaits the value. (This is an extremely contrived example to demonstrate a pattern. Don’t email how you’d refactor it.) The point here is awaiting the result of a Task is a nice, straight forward mechanism for synchronizing work between two tasks.

However, there’s a variation of this problem that I want to solve in this post. It’s not that a specific task has completed, but a condition has become true. That sounds subtle, but there’s actually a big difference. Multiple Tasks may be working and any one them might make the condition become true.

For example, suppose I want to limit the number of concurrent Tasks that are downloading images to be no more than 10. When a Task starts to download an image it needs to know if there’s less than 10. If there are 10 or more, it wants to wait until one of the existing Task has finished. This is where awaiting on a single Task as a solution falls apart. The waiting Task actually wants to know when the concurrency count goes below 10, not when a specific Task completes (although they are correlated). It can’t await all the executing Tasks because it doesn’t need to wait until all Tasks complete. If it awaits a single Task, it also might wait longer than necessary if a different Task from the one its awaiting completes first. Of course this is all ignoring that there could be many Tasks waiting for the concurrency count to go below 10, and how do I ensure only one of them continues when it drops to 9 concurrent?

(Also, I know the classic way to solve the image download concurrency limiting is to use OperationQueue. But in a future post, I’m going to argue that’s not ideal in an async/await world for this problem.)

To summarize: I want to have a Task to wait on a condition to become true, which isn’t the same as waiting on a Task to complete.

Classic approach

If I was still in the bad old days of pthreads, I’d reach for a condition variable. I’d have the queued Task wait on it, and when the concurrent count dropped below 10, the just completed download Task would signal the condition variable. But I can’t actually use pthread condition variables because they’d block the underlying system thread, which would hang one of the thread resources used by Swift’s cooperative pool. Then everybody would just be sad.

However, what if I could construct something similar to a condition variable that stays in async/await land? It wouldn’t actually need to do everything a traditional pthread condition does, like make sure only one Task unblocks because I could handle that manually.

Let me sketch out what that might look like:


struct Condition {
    func wait() async {}

    static func makeCondition() -> (Condition, Signal)
}

final class Signal {
    func signal() {}
}

With this approach, the Task that wants to download an image can call into the queue that knows how many downloads are currently going. The queue constructs a Condition/Signal pair, keeps the Signal and returns back the Condition to the calling Task. The calling Task then wait()s on that Condition. When the queue decides it’s time for that specific Task to go (it can keep a prioritized array of Signals), it calls signal() on the Signal and the paired Condition unblocks.

This behavior would solve my stated problem: my Task could wait on a condition to become true, instead of waiting on a specific Task.

Implementation

I chose to use an AsyncStream to build this functionality. Basically, the Condition.wait() is going to sit in an async/await loop waiting on the AsyncStream to complete. The Signal.signal() calls finish() on the AsyncStream‘s continuation to unblock it.


/// Signal is used in conjunction with Condition. Together they allow
/// one Task to wait on anther Task.
public final class Signal {
    private let stream: AsyncStream<Void>.Continuation

    /// Private init, don't call directly. Instead, use Condition.makeCondition()
    fileprivate init(stream: AsyncStream<Void>.Continuation) {
        self.stream = stream
    }

    /// Signal the waiter (who has the Condition) that they're good to go
    public func signal() {
        stream.finish()
    }
}

/// Condition allows two async Tasks to coordinate. Use `makeCondition()` to
/// create a Condition/Signal pair. The Task that wants to wait on something to
/// happen takes the Condition, the Task that notifies of the condition takes
/// the Signal.
public struct Condition {
    private let waiter: () async -> Void

    /// Private init; create a closure that will can be waited on
    fileprivate init(waiter: @escaping () async -> Void) {
        self.waiter = waiter
    }

    /// Wait on the condition to become true
    public func wait() async {
        await waiter()
    }

    /// Construct a Condition/Signal pair. The Task that wants to wait on something to
    /// happen takes the Condition, the Task that notifies of the condition takes
    /// the Signal.
    public static func makeCondition() -> (Condition, Signal) {
        let (stream, continuation) = AsyncStream<Void>.makeStream()
        let condition = Condition {
            for await _ in stream {}
        }
        let signal = Signal(stream: continuation)
        return (condition, signal)
    }
}

Conclusion

In this post I described a variation of a Task synchronization problem. In this variation a Task wants to wait on a condition to become true, as opposed to a Task being completed. I then introduced simplified version of a traditional synchronization mechanism called a condition variable as a mechanism for solving this problem. Finally, I demonstrated a working async/await solution using AsyncStream.

Read the follow up post.