Alternative title: A concurrency limiter for Swift async/await

This post is really for me and my future self to help clarify my thinking about modeling asynchronous operations in Swift async/await. But I suppose you can read along as well.

In the world prior to async/await I used dispatch queues and OperationQueues to model asynchronous operations. As a result, my thinking about async naturally reverts to the hammer of execution queues. However, I’m starting to realize execution queues are sometimes not a good fit for modeling async operations in Swift async/await. In this post, I’m going to walk through an example where I found that an execution queue wasn’t what I wanted. Instead I wanted to simply limit the number of Tasks that could concurrently pass through a certain block of code. I’ll end with how I solved the limiter problem.

An example of my broken thinking

To return to my previous post’s example, let’s say I’m building an image cache. Part of the caching infrastructure is to actually download the image from the network. Since I can overload the network if I try to download too many images at once, I want to limit this part to ensure only so many download operations are happening concurrently.

The “classic” way to handle the limiting is to schedule the image download operation in an OperationQueue and make sure it has its maxConcurrentOperationCount set appropriately. Therefore, naturally, I turned to execution queues as a way to solve my concurrent downloads problem. I tried to build a Swift async/await “native” version of a concurrent execution queue. It got something that “mostly” worked, for some definition of “mostly”. And “work”. But what I couldn’t make work is a clean way to get the downloaded image back to the caller.

When I run into difficulties like this, I stop and try to figure out why it’s difficult. Usually it’s because there’s a mismatch between my mental models, and I’m trying to force a square peg into a round hole.

For my image cache, each fetch image operation is modeled as a Task. In pseudo-code something like this:


Task.detached {
    if let image = await memoryCache.image(for: url) {
        return image
    }
    if let image = await diskCache.image(for: url) {
        return image
    }

    downloadQueue.run {
        // This block actually runs in Task created by the Queue
        let image = await network.downloadImage(for: url)

        // TODO: how do I get `image` back to the calling Task?
    }

    //...
}

Things flow well until I need to actually download the image and want to limit how many concurrent ones via the downloadQueue. How do I get the image back to the calling Task? I mean, I guess I might figure out a way to make a callback work? That feels super janky, and possibly not safe.

However, looking at this code is when I first realized a couple of things. First, I was switching execution contexts to do the download. Why? I have a perfectly good detached Task already that I would prefer to be executing in. Tasks are supposed to be “recipes”, i.e. a list of steps to perform, which should include the actual download. So, I didn’t need the execution part of an execution queue. Second, what I actually needed it for was limiting the number of concurrent operations; that was its real purpose.

Therefore, my actual problem is I need a way to limit how many concurrent Tasks are calling await network.downloadImage(for: url) at the same time. Can I solve that problem instead?

A concurrency limiter

I’m going to solve this problem of queues with a queue. Obviously. The difference is this queue won’t provide any execution. The calling Task will do that.

I like starting with what I’d like to have at the call site:


Task.detached {
    if let image = await memoryCache.image(for: url) {
        return image
    }
    if let image = await diskCache.image(for: url) {
        return image
    }

    let image = downloadLimiter.run {
        // This runs on the calling task. No extra tasks!
        await network.downloadImage(for: url)
    }

    //...
}

The key bit is the closure is passed to run is executed on the calling Task. That makes returning the image back trivial. A call site like the above example would imply an interface something like:


public final class ConcurrencyLimiter {
    /// Concurrency is the maximum number of concurrent blocks to allow to run
    public init(concurrency: Int) 

    /// Execute the given block on the calling Task. It may wait first if there
    /// are already the maximum blocks running.
    public func run<Value>(_ block: @escaping () async throws -> Value) async throws -> Value
}

It has an init so it can know the maximum number of concurrent blocks, and then just a run method that executes the passed in closure. There’s also a non-throwing variant of run but it’s a simplified version of the throwing one, so it’s less interesting.

I’ll start on the outside and work my way inside to the details of the implementation. Here’s how I built run:


public func run<Value>(_ block: @escaping () async throws -> Value) async throws -> Value {
    let condition = await counter.enterLimiting()
    await condition.wait()

    do {
        let value = try await block()
        await counter.exitLimiting()
        return value
    } catch {
        await counter.exitLimiting()
        throw error
    }
}

First things first: counter is an actor owned by the ConcurrencyLimiter. It does all the… counting… of how many tasks are running at one time. It’ll also decide which task goes next. So maybe I should have called it Scheduler? This is chaos that happens when you don’t have code reviews. smh.

run calls into counter to ask to enter the limited section. The counter returns a Condition which the calling Task immediately waits on. The Condition is described in my previous post, and is what allows Counter to control which Task goes next. Once the Task returns from the call to condition.wait() it executes its code. Before returning or throwing it calls back into counter on exitLimiting() to let it know this Task is done.

That’s it for run. Most of the heavy lifting is in Counter so I’ll look at that next.


final actor Counter {
    /// The maximum amount of currency allowed
    private let concurrency: Int
    /// How many blocks are currently in flight
    private var inflightCount = 0
    /// Pending (blocked) blocks in a FIFO queue.
    private var pending = [Signal]()

    init(concurrency: Int) {
        self.concurrency = concurrency
    }
}

There’s not much data needed by the scheduler. Um… I mean Counter. Counter. It has the maximum number of concurrent tasks allowed, how many are actually in flight right now, then an array of Tasks in priority order represented by the Signal that will unblock them. That’s it.

enterLimiting() is called by the Task when it wants to enter the limited section:


func enterLimiting() -> Condition {
    let shouldWait = inflightCount >= concurrency

    let (condition, signal) = Condition.makeCondition()
    if shouldWait {
        pending.append(signal)
    } else {
        // immediately signal and let it run
        inflightCount += 1
        signal.signal()
    }

    return condition
}

This method is in an actor, and there are no awaits in here, so this method happens without reentrancy. That’s important. It computes if the caller can immediately execute or it should wait. Either way, it creates a Condition/Signal pair so it can return the Condition. If the calling Task can execute immediately, it acts like it has already started by incrementing in the inflight count and pre-signaling (i.e. unblocking) the Condition. If the calling Task needs to wait, the Signal is appended to the end of the waiting queue.

The other end of the process is when the calling Task leaves the limited section.


func exitLimiting() {
    inflightCount -= 1
    let shouldUnblock = inflightCount < concurrency

    guard shouldUnblock, let firstPending = pending.first else {
        return
    }
    pending.removeFirst()
    inflightCount += 1
    firstPending.signal()
}

It immediately updates the inflight count and determines if it can unblock any of the pending Tasks. If it can and there are tasks waiting, it counts the Task as started and unblocks by signaling the Task‘s condition.

You might have noticed this is a simple FIFO queue for prioritization. However, really any prioritization scheme could be substituted in.

Finally, I put a little bit of “what if” insurance in the deinit of the actor. Basically, what happens if the Counter goes out of scope, but there are still Tasks blocked on it?


deinit {
    let localPending = pending
    pending.removeAll()
    for local in localPending {
        local.signal()
    }
}

I make a local copy because I’m just extra paranoid (probably don’t need that), then go through and unblock everything. Presumably no callers care anymore if we’re going out of scope, but I don’t want to leave any Tasks blocked.

(As an aside, I remember from my undergrad days this kind of data structure being called a “monitor.” But Wikipedia says I’m wrong.)

Conclusion

In this post, I described my learning journey where I realized that I didn’t need an execution queue like OperationQueue for my particular problem. Instead, I wanted to let each Task bring its own execution, and just needed a queue to limit the number of concurrent executions. Finally, I demonstrated a simple implementation of a ConcurrencyLimiter using the Condition/Signal pair I introduced in my previous post.