Handling periodic tasks in Swift Vapor
In my Swift Vapor side project, there have been a couple of instances where I wanted to run some periodic tasks. My needs were pretty simple: every X amount of time, I wanted to run a closure that would perform some database operations. On top of that, it would be nice to have a tiny bit of infrastructure to make periodic jobs require less boilerplate, have visibility in production (i.e. logging if the job was successful or not), and the ability to verify the jobs in integration tests.
In this post, I’m going to flesh out a periodic job interface, lay out some implementation approaches, talk through what I actually did, and finally how I tested it.
A periodic example
First, I’ll define how I’d like to be able to schedule a periodic task in client code. Here’s an except from my app:
func boot(_ app: Application) throws {
let jobQueue = try app.make(JobQueue.self)
jobQueue.schedule(initialDelay: .hours(1),
delay: .hours(24),
withName: "Accounts.cleanup") { container, connection -> Future<Void> in
return self.cleanup(with: container, on: connection)
}
}
private func cleanup(with container: Container, on connection: DatabaseConnectable) -> Future<Void> {
return // task that performs clean up
}
I didn’t need a complicated interface to schedule a periodic job. The initial delay and repeated delay are givens for any periodic task. The name is a bit extra, but it turned out to be quite helpful when testing and logging. Finally, the “job” itself is a closure that’s passed a dependency injection container and a database connection. Those two things are needed for just about any meaningful work in Vapor. Finally, the closure will return a Void
promise, which is used by the job infrastructure to know when it is complete, and if it was successful or not.
Although the JobQueue
interface is inferable from the example above, here’s exactly how I’ve defined it for my app:
import Foundation
import Vapor
typealias Job = (Container, DatabaseConnectable) -> Future<Void>
protocol JobQueue: Service {
func schedule(initialDelay: TimeAmount, delay: TimeAmount, withName name: String, _ job: @escaping Job)
}
I’ve defined it as a protocol because I’ll inject a fake implementation in my integration tests. Since I’m taking advantage of the dependency injection system in Vapor, I conform the protocol to Service
. But all I needed was one method on the protocol to actually schedule the periodic job. The TimeAmount
is a datatype defined down in the NIO
framework.
JobQueue Decisions
Now that I’ve defined the JobQueue
interface, I need to figure out how to implement it. I thought of a few options.
One option was to register a timer in my app at the given interval, and run the periodic job closure then and there. The benefit of this is it simple to understand and implement. The downside is it doesn’t scale up or down very well. If my host spins up more than one instance of my app, both instances will try to run this periodic task. Or, if I’m on a hobbyist plan, and my host spins down all my instances because of lack of requests, then my periodic task won’t run at all.
Another option was to find or build something like Ruby’s Resque but for Swift, and use a timer to schedule a background job on Resque-but-for-Swift to perform the periodic job. The benefit would be the job would only be run once no matter how many instances of my app were spun up, plus job persistence. The downside is, as far as my Google searches show, such a thing does not exist, and building it myself would be a significant undertaking.
There were also some options that didn’t fit the interface I gave above, but could be legitimate solutions. First up, I could define a Vapor command that’s invokable from my app’s command line that performs the periodic job. Then, using my host’s infrastructure, manually invoke the command when I think it needs to be run. The downside is it relies on me remembering to run the command. However, it wouldn’t need any internal job infrastructure, and would only run on the instance I wanted. A second option would be to do nothing. If my periodic job didn’t run nothing bad would happen for a very long time. The upside to this is it cost nothing to implement. The downside is there is a timebomb in my app that will one day go off. But that might be fine because the app might not last that long, or be completely redesigned so the periodic job is unnecessary.
In the end, I went with the first solution of registering a timer in my app, and running the closure when it triggered. I decided this for a few reasons. First, I’m lazy and forgetful and won’t remember to run some manual command. Second, my side project probably won’t ever scale beyond one instance. Third, even if it did scale up, the periodic tasks running concurrently wouldn’t hurt anything. Finally, this solution did something (so no timebomb), but was the least investment that did something.
As far as using a timer in my Vapor app, my first attempt was to use a 3rd party Swift library that implemented periodic jobs on top of DispatchQueue
. That did seem to work, but it happened outside of Vapor’s EventLoop
s, which always seemed a little janky to me. Later, I discovered that the NIO
framework offered RepeatedTask
s on their EventLoop
s, so I used those instead.
Implementing JobQueue
I had an approach now, I just needed to implement it. First up, building the production implementation of JobQueue
.
import Foundation
import Vapor
final class ProductionJobQueue: JobQueue {
private let eventLoop: EventLoop
private let container: Container
private let logger: Logger
init(eventLoop: EventLoop, container: Container, logger: Logger) {
self.eventLoop = eventLoop
self.container = container
self.logger = logger
}
...
}
The ProductionJobQueue
needed a few things in order to function. First, it needs an EventLoop
so it can schedule RepeatedTask
s. Second, it needs a dependency injection Container
so it can get a database connection later to hand to a periodic job. Finally, it takes a Logger
because the app logs any periodic job invocation to provide some insight into what’s going on in the deployed app.
The only required method of the JobQueue
protocol is the schedule()
method:
final class ProductionJobQueue: JobQueue {
...
func schedule(initialDelay: TimeAmount, delay: TimeAmount, withName name: String, _ job: @escaping Job) {
eventLoop.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { repeatedTask -> EventLoopFuture<Void> in
return self.execute(job, withName: name, on: repeatedTask)
}
}
...
}
This is a thin wrapper around EventLoop.scheduleRepeatedTask()
. All the interesting bits of the implementation are in the execute()
method which actually calls the job closure.
final class ProductionJobQueue: JobQueue {
...
private func execute(_ job: @escaping Job, withName name: String, on repeatedTask: RepeatedTask) -> Future<Void> {
let startTime = Date()
return container.withPooledConnection(to: .psql) { connection -> Future<Void> in
return job(self.container, connection)
}.map {
self.logSuccess(for: name, whichStartedAt: startTime)
}.catch { error in
self.logFailure(error, for: name, whichStartedAt: startTime)
}
}
...
}
The execute()
method provides most of the value add functionality of the JobQueue
. First off, it grabs a database connection for the job, so the job doesn’t have to itself. Once it has a connection, it invokes the job closure with the container and database connection. It waits on the job to complete, then logs the success or failure of the job.
final class ProductionJobQueue: JobQueue {
...
private func logSuccess(for name: String, whichStartedAt startTime: Date) {
let time = Date().timeIntervalSince(startTime).asFormattedMilliseconds
logger.info("JOB \(name) -> SUCCESS [\(time)]")
}
private func logFailure(_ error: Error, for name: String, whichStartedAt startTime: Date) {
let time = Date().timeIntervalSince(startTime).asFormattedMilliseconds
logger.info("JOB \(name) -> FAILURE [\(time)]")
}
}
The logging of jobs is simple, but provides some necessary transparency. It just logs the name of the job, whether it was a success or failure, and the time it took to complete. Even though I capture the error in the failure case, I don’t log it for now. That’s because it might possibly have sensitive info in it (like database connection info) that I don’t want being saved out to a log. In the future, I might log out the name of the error type, if I discovered I needed more information when debugging production issues.
The asFormattedMilliseconds
property above is an extraction of code from my earlier Swift Vapor logging post:
extension TimeInterval {
private static let intervalFormatter: NumberFormatter = {
let formatter = NumberFormatter()
formatter.numberStyle = .decimal
formatter.maximumFractionDigits = 2
formatter.multiplier = 1000
return formatter
}()
var asFormattedMilliseconds: String {
return TimeInterval.intervalFormatter.string(for: self).map { $0 + "ms" } ?? "???ms"
}
}
Finally, I needed to implement the Service
protocol so the class could be used in the dependency injection system.
extension ProductionJobQueue: ServiceType {
static var serviceSupports: [Any.Type] {
return [JobQueue.self]
}
static func makeService(for worker: Container) throws -> ProductionJobQueue {
return try ProductionJobQueue(eventLoop: worker.eventLoop, container: worker, logger: worker.make())
}
}
As usual, I used ServiceType
to conform to the Service
protocol. This allowed me to register with only the type, plus have a dependency injection container available when the instance was created. This implementation was a bit different than usual, in that I needed to override the static serviceSupports
property to include the JobQueue
protocol. This is needed because by default ServiceType
only registers the concrete type. By listing JobQueue
as supported, client code can ask the container for JobQueue
and they’ll get a ProductionJobQueue
in the production app. The makeService()
method takes full advantage of being given a Container
: it passes in the event loop from it, the container itself, and creates a logger from the container.
Finally, ProductionJobQueue
is registered as a service from the ProductionConfiguration
. At this point, periodic jobs should work in production. But it would be nice to test them.
Testing
I spent some time thinking about what and how to test periodic jobs. I knew I wanted integration level testing, and that I’d facilitate that by building a fake implementation of JobQueue
. I needed a fake because waiting on timers to fire in a test isn’t a viable option, especially if they’re only firing once a day. Also, for me, integration testing meant I should be able to verify that the periodic job was both registered and it performed what it was supposed to. I considered two possible solutions.
The first idea was to have a fake job queue that individual tests could manipulate the clock on. That is, a test could say “pretend two hours have elapsed” to the fake job queue, and the job queue would go look through the registered jobs and fire the ones that should have fired by then. The upside to this approach is it could be used to validate the initial and repeating delays given at registration were working. The downside is this would require a sophisticated fake that could do time calculations reliably. This would give me less confidence that I was testing production code, as opposed to testing my fake job queue.
The second idea was to require each periodic job to provide a unique name. Then individual tests could ask the fake job queue to invoke a periodic job by that name. The upside is the fake is simple and predictable. The downside is jobs have to have globally unique names, which is a bit of overhead.
I decided on the second option to invoke jobs by name. Because of the logging done in production, the requirement that jobs have globally unique names was already there, so the downside was moot. Additionally, the “upside” of the first option was validating the delay values. But in my case, they were always constants, so there seemed to be little value in validating them. Finally, the ease of implementing the second option was hard to ignore.
This approach turned out to be easy to use in my tests. As an example:
func testRemoveExpiredChallenges() throws {
// setup
try app.jobQueue.execute("Accounts.cleanup", container: app.container, connection: connection)
// validate here
}
I ended up redacting all the setup and validation code for this test for succinctness, and because all of it was specific to the app and not that interesting. The only part left is invoking the periodic job on the fake job queue, which is done by passing the job name, a Container
and a connection to the database. The app
seen here is a TestApplication
, which I covered in a post about integration testing Swift Vapor.
Because of my selected testing approach, TestJobQueue
was easy to implement. Here’s how I conformed it to the JobQueue
protocol:
import Foundation
import Vapor
final class TestJobQueue: JobQueue {
struct Entry {
let initialDelay: TimeAmount
let delay: TimeAmount
let name: String
let job: Job
}
var schedule_wasCalled = false
var schedule_wasCalled_withEntries = [Entry]()
func schedule(initialDelay: TimeAmount, delay: TimeAmount, withName name: String, _ job: @escaping Job) {
schedule_wasCalled = true
let entry = Entry(initialDelay: initialDelay, delay: delay, name: name, job: job)
schedule_wasCalled_withEntries.append(entry)
}
...
}
The schedule()
method is just a recorder. It creates an entry for each scheduled job and stores it in an array. The important bits that are used later are the name
and job
. I ended up storing the initialDelay
and delay
so if a test actually wanted to validate them, they could.
The other interesting piece to the TestJobQueue
class is the execute()
method that tests call when they want to fake a periodic job being triggered.
final class TestJobQueue: JobQueue {
...
func execute(_ name: String, container: Container, connection: DatabaseConnectable) throws {
guard let entry = schedule_wasCalled_withEntries.first(where: { $0.name == name }) else {
return
}
try entry.job(container, connection).wait()
}
}
My implementation is as simple as I could make it. It searches the array of entries looking for the first job with the matching name. If it can’t find a matching job, it silently fails. This is intentional: it mimics what would happen in production if a periodic job wasn’t properly registered. Namely, nothing would happen. It is up to the individual test cases to verify that a job’s side effects are happening. However, execute()
does wait on the job to complete before returning, just to make the tests simpler.
The last bit needed for TestJobQueue
is registering it with the dependency injection system. I did it differently from the production job queue, however. ProductionJobQueue
conformed to ServiceType
because it needed access to a dependency injection container to initialize. TestJobQueue
doesn’t have the same requirement to instantiate. Further, it would be useful to the tests if the TestJobQueue
were exposed to them as that type — so they can access the execute()
method. Otherwise they’d have to force cast a JobQueue
to a TestJobQueue
, and I find that more than a little gross.
For that reason, I modified TestingConfiguration
to hold an exact instance:
struct TestingConfiguration: ConfigurationType {
...
let jobQueue = TestJobQueue()
func configure(_ services: inout Services) throws {
...
services.register(jobQueue, as: JobQueue.self)
...
}
...
}
This is the TestingConfiguration
that I described in how I do environment configuration for Swift Vapor. Here, it registered a member variable of type TestJobQueue
as a JobQueue
. By making it publicly available on TestingConfiguration
, the TestApplication
has easy access to it.
The final piece is exposing the TestJobQueue
on TestApplication
so it’s convenient for tests to use.
class TestApplication {
...
var jobQueue: TestJobQueue {
return configuration.jobQueue
}
...
}
Since the TestApplication
already has a reference to TestingConfiguration
, it can return that instance. The test code now works.
Conclusion
My Swift Vapor app had a need to run some clean up jobs periodically. The requirements were simple: it was ok if the job ran on multiple instances or not at all, but it should make a good faith effort to run at least once. I wanted a class that reduced the necessary boilerplate for scheduling a job, while adding logging to the jobs, and being testable. I settled on NIO
‘s RepeatedTasks
to implement timers to run the periodic tasks in each app instance. For testing, I kept the testing fake simple: the tests could invoke jobs by name, and then validate the side effects.