Queues¶
Vapor Queues (vapor/queues) is a pure Swift queuing system that allows you to offload task responsibility to a side worker.
Some of the tasks this package works well for:
- Sending emails outside of the main request thread
- Performing complex or long-running database operations
- Ensuring job integrity and resilience
- Speeding up response time by delaying non-critical processing
- Scheduling jobs to occur at a specific time
This package is similar to Ruby Sidekiq. It provides the following features:
- Safe handling of
SIGTERM
andSIGINT
signals sent by hosting providers to indicate a shutdown, restart, or new deploy. - Different queue priorities. For example, you can specify a queue job to be run on the email queue and another job to be run on the data-processing queue.
- Implements the reliable queue process to help with unexpected failures.
- Includes a
maxRetryCount
feature that will repeat the job until it succeeds up until a specified count. - Uses NIO to utilize all available cores and EventLoops for jobs.
- Allows users to schedule repeating tasks
Queues currently has one officially supported driver which interfaces with the main protocol:
Queues also has community-based drivers: - QueuesMongoDriver - QueuesFluentDriver
Tip
You should not install the vapor/queues
package directly unless you are building a new driver. Install one of the driver packages instead.
Getting Started¶
Let's take a look at how you can get started using Queues.
Package¶
The first step to using Queues is adding one of the drivers as a dependency to your project in your SwiftPM package manifest file. In this example, we'll use the Redis driver.
// swift-tools-version:5.8
import PackageDescription
let package = Package(
name: "MyApp",
dependencies: [
/// Any other dependencies ...
.package(url: "https://github.com/vapor/queues-redis-driver.git", from: "1.0.0"),
],
targets: [
.executableTarget(name: "App", dependencies: [
// Other dependencies
.product(name: "QueuesRedisDriver", package: "queues-redis-driver")
]),
.testTarget(name: "AppTests", dependencies: [.target(name: "App")]),
]
)
If you edit the manifest directly inside Xcode, it will automatically pick up the changes and fetch the new dependency when the file is saved. Otherwise, from Terminal, run swift package resolve
to fetch the new dependency.
Config¶
The next step is to configure Queues in configure.swift
. We'll use the Redis library as an example:
import QueuesRedisDriver
try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))
Registering a Job
¶
After modeling a job you must add it to your configuration section like this:
//Register jobs
let emailJob = EmailJob()
app.queues.add(emailJob)
Running Workers as Processes¶
To start a new queue worker, run swift run App queues
. You can also specify a specific type of worker to run: swift run App queues --queue emails
.
Tip
Workers should stay running in production. Consult your hosting provider to find out how to keep long-running processes alive. Heroku, for example, allows you to specify "worker" dynos like this in your Procfile: worker: Run queues
. With this in place, you can start workers on the Dashboard/Resources tab, or with heroku ps:scale worker=1
(or any number of dynos preferred).
Running Workers in-process¶
To run a worker in the same process as your application (as opposed to starting a whole separate server to handle it), call the convenience methods on Application
:
try app.queues.startInProcessJobs(on: .default)
To run scheduled jobs in process, call the following method:
try app.queues.startScheduledJobs()
Warning
If you don't start the queue worker either via command line or the in-process worker the jobs will not dispatch.
The Job
Protocol¶
Jobs are defined by the Job
or AsyncJob
protocol.
Modeling a Job
object:¶
import Vapor
import Foundation
import Queues
struct Email: Codable {
let to: String
let message: String
}
struct EmailJob: Job {
typealias Payload = Email
func dequeue(_ context: QueueContext, _ payload: Email) -> EventLoopFuture<Void> {
// This is where you would send the email
return context.eventLoop.future()
}
func error(_ context: QueueContext, _ error: Error, _ payload: Email) -> EventLoopFuture<Void> {
// If you don't want to handle errors you can simply return a future. You can also omit this function entirely.
return context.eventLoop.future()
}
}
If using async
/await
you should use AsyncJob
:
struct EmailJob: AsyncJob {
typealias Payload = Email
func dequeue(_ context: QueueContext, _ payload: Email) async throws {
// This is where you would send the email
}
func error(_ context: QueueContext, _ error: Error, _ payload: Email) async throws {
// If you don't want to handle errors you can simply return. You can also omit this function entirely.
}
}
Info
Make sure your Payload
type implements the Codable
protocol.
Tip
Don't forget to follow the instructions in Getting Started to add this job to your configuration file.
Dispatching Jobs¶
To dispatch a queue job, you need access to an instance of Application
or Request
. You will most likely be dispatching jobs inside of a route handler:
app.get("email") { req -> EventLoopFuture<String> in
return req
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message")
).map { "done" }
}
// or
app.get("email") { req async throws -> String in
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"))
return "done"
}
If you, instead, need to dispatch a job from a context where the Request
object is not available (like, for example, from within a Command
), you will need to use the queues
property inside the Application
object, such as:
struct SendEmailCommand: AsyncCommand {
func run(using context: CommandContext, signature: Signature) async throws {
context
.application
.queues
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message")
)
}
}
Setting maxRetryCount
¶
Jobs will automatically retry themselves upon error if you specify a maxRetryCount
. For example:
app.get("email") { req -> EventLoopFuture<String> in
return req
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3
).map { "done" }
}
// or
app.get("email") { req async throws -> String in
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3)
return "done"
}
Specifying a delay¶
Jobs can also be set to only run after a certain Date
has passed. To specify a delay, pass a Date
into the delayUntil
parameter in dispatch
:
app.get("email") { req async throws -> String in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // One day
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate)
return "done"
}
If a job is dequeued before its delay parameter, the job will be re-queued by the driver.
Specify a priority¶
Jobs can be sorted into different queue types/priorities depending on your needs. For example, you may want to open an email
queue and a background-processing
queue to sort jobs.
Start by extending QueueName
:
extension QueueName {
static let emails = QueueName(string: "emails")
}
Then, specify the queue type when you retrieve the jobs
object:
app.get("email") { req -> EventLoopFuture<String> in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // One day
return req
.queues(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
).map { "done" }
}
// or
app.get("email") { req async throws -> String in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // One day
try await req
.queues(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
)
return "done"
}
When accessing from within the Application
object you should do as follows:
struct SendEmailCommand: AsyncCommand {
func run(using context: CommandContext, signature: Signature) async throws {
context
.application
.queues
.queue(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
)
}
}
If you do not specify a queue the job will be run on the default
queue. Make sure to follow the instructions in Getting Started to start workers for each queue type.
Scheduling Jobs¶
The Queues package also allows you to schedule jobs to occur at certain points in time.
Warning
Scheduled jobs only work when set up before the application boots up, such as in configure.swift
. They will not work in route handlers.
Starting the scheduler worker¶
The scheduler requires a separate worker process to be running, similar to the queue worker. You can start the worker by running this command:
swift run App queues --scheduled
Tip
Workers should stay running in production. Consult your hosting provider to find out how to keep long-running processes alive. Heroku, for example, allows you to specify "worker" dynos like this in your Procfile: worker: App queues --scheduled
Creating a ScheduledJob
¶
To begin, start by creating a new ScheduledJob
or AsyncScheduledJob
:
import Vapor
import Queues
struct CleanupJob: ScheduledJob {
// Add extra services here via dependency injection, if you need them.
func run(context: QueueContext) -> EventLoopFuture<Void> {
// Do some work here, perhaps queue up another job.
return context.eventLoop.makeSucceededFuture(())
}
}
struct CleanupJob: AsyncScheduledJob {
// Add extra services here via dependency injection, if you need them.
func run(context: QueueContext) async throws {
// Do some work here, perhaps queue up another job.
}
}
Then, in your configure code, register the scheduled job:
app.queues.schedule(CleanupJob())
.yearly()
.in(.may)
.on(23)
.at(.noon)
The job in the example above will be run every year on May 23rd at 12:00 PM.
Tip
The Scheduler takes the timezone of your server.
Available builder methods¶
There are five main methods that can be called on a scheduler, each of which creates its respective builder object that contains more helper methods. You should continue building out a scheduler object until the compiler does not give you a warning about an unused result. See below for all available methods:
Helper Function | Available Modifiers | Description |
---|---|---|
yearly() |
in(_ month: Month) -> Monthly |
The month to run the job in. Returns a Monthly object for further building. |
monthly() |
on(_ day: Day) -> Daily |
The day to run the job in. Returns a Daily object for further building. |
weekly() |
on(_ weekday: Weekday) -> Daily |
The day of the week to run the job on. Returns a Daily object. |
daily() |
at(_ time: Time) |
The time to run the job on. Final method in the chain. |
at(_ hour: Hour24, _ minute: Minute) |
The hour and minute to run the job on. Final method in the chain. | |
at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) |
The hour, minute, and period to run the job on. Final method of the chain | |
hourly() |
at(_ minute: Minute) |
The minute to run the job at. Final method of the chain. |
minutely() |
at(_ second: Second) |
The second to run the job at. Final method of the chain. |
Available helpers¶
Queues ships with some helpers enums to make scheduling easier:
Helper Function | Available Helper Enum |
---|---|
yearly() |
.january , .february , .march , ... |
monthly() |
.first , .last , .exact(1) |
weekly() |
.sunday , .monday , .tuesday , ... |
daily() |
.midnight , .noon |
To use the helper enum, call in to the appropriate modifier on the helper function and pass the value. For example:
// Every year in January
.yearly().in(.january)
// Every month on the first day
.monthly().on(.first)
// Every week on Sunday
.weekly().on(.sunday)
// Every day at midnight
.daily().at(.midnight)
Event Delegates¶
The Queues package allows you to specify JobEventDelegate
objects that will receive notifications when the worker takes action on a job. This can be used for monitoring, surfacing insights, or alerting purposes.
To get started, conform an object to JobEventDelegate
and implement any required methods
struct MyEventDelegate: JobEventDelegate {
/// Called when the job is dispatched to the queue worker from a route
func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// Called when the job is placed in the processing queue and work begins
func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// Called when the job has finished processing and has been removed from the queue
func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// Called when the job has finished processing but had an error
func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
}
Then, add it in your configuration file:
app.queues.add(MyEventDelegate())
There are a number of third-party packages that use the delegate functionality to provide additional insight into your queue workers:
Testing¶
To avoid synchronization problems and ensure deterministic testing, the Queues package provides an XCTQueue
library and an AsyncTestQueuesDriver
driver dedicated to testing which you can use as follows:
final class UserCreationServiceTests: XCTestCase {
var app: Application!
override func setUp() async throws {
self.app = try await Application.make(.testing)
try await configure(app)
// Override the driver being used for testing
app.queues.use(.asyncTest)
}
override func tearDown() async throws {
try await self.app.asyncShutdown()
self.app = nil
}
}
See more details in Romain Pouclet's blog post.