Aller au contenu

Queues

Vapor Queues (vapor/queues) is a pure Swift queuing system that allows you to offload task responsibility to a side worker.

Some of the tasks this package works well for:

  • Sending emails outside of the main request thread
  • Performing complex or long-running database operations
  • Ensuring job integrity and resilience
  • Speeding up response time by delaying non-critical processing
  • Scheduling jobs to occur at a specific time

This package is similar to Ruby Sidekiq. It provides the following features:

  • Safe handling of SIGTERM and SIGINT signals sent by hosting providers to indicate a shutdown, restart, or new deploy.
  • Different queue priorities. For example, you can specify a queue job to be run on the email queue and another job to be run on the data-processing queue.
  • Implements the reliable queue process to help with unexpected failures.
  • Includes a maxRetryCount feature that will repeat the job until it succeeds up until a specified count.
  • Uses NIO to utilize all available cores and EventLoops for jobs.
  • Allows users to schedule repeating tasks

Queues currently has one officially supported driver which interfaces with the main protocol:

Queues also has community-based drivers: - QueuesMongoDriver - QueuesFluentDriver

Tip

You should not install the vapor/queues package directly unless you are building a new driver. Install one of the driver packages instead.

Getting Started

Let's take a look at how you can get started using Queues.

Package

The first step to using Queues is adding one of the drivers as a dependency to your project in your SwiftPM package manifest file. In this example, we'll use the Redis driver.

// swift-tools-version:5.8
import PackageDescription

let package = Package(
    name: "MyApp",
    dependencies: [
        /// Any other dependencies ...
        .package(url: "https://github.com/vapor/queues-redis-driver.git", from: "1.0.0"),
    ],
    targets: [
        .executableTarget(name: "App", dependencies: [
            // Other dependencies
            .product(name: "QueuesRedisDriver", package: "queues-redis-driver")
        ]),
        .testTarget(name: "AppTests", dependencies: [.target(name: "App")]),
    ]
)

If you edit the manifest directly inside Xcode, it will automatically pick up the changes and fetch the new dependency when the file is saved. Otherwise, from Terminal, run swift package resolve to fetch the new dependency.

Config

The next step is to configure Queues in configure.swift. We'll use the Redis library as an example:

import QueuesRedisDriver

try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))

Registering a Job

After modeling a job you must add it to your configuration section like this:

//Register jobs
let emailJob = EmailJob()
app.queues.add(emailJob)

Running Workers as Processes

To start a new queue worker, run swift run App queues. You can also specify a specific type of worker to run: swift run App queues --queue emails.

Tip

Workers should stay running in production. Consult your hosting provider to find out how to keep long-running processes alive. Heroku, for example, allows you to specify "worker" dynos like this in your Procfile: worker: Run queues. With this in place, you can start workers on the Dashboard/Resources tab, or with heroku ps:scale worker=1 (or any number of dynos preferred).

Running Workers in-process

To run a worker in the same process as your application (as opposed to starting a whole separate server to handle it), call the convenience methods on Application:

try app.queues.startInProcessJobs(on: .default)

To run scheduled jobs in process, call the following method:

try app.queues.startScheduledJobs()

Warning

If you don't start the queue worker either via command line or the in-process worker the jobs will not dispatch.

The Job Protocol

Jobs are defined by the Job or AsyncJob protocol.

Modeling a Job object:

import Vapor 
import Foundation 
import Queues 

struct Email: Codable {
    let to: String
    let message: String
}

struct EmailJob: Job {
    typealias Payload = Email

    func dequeue(_ context: QueueContext, _ payload: Email) -> EventLoopFuture<Void> {
        // This is where you would send the email
        return context.eventLoop.future()
    }

    func error(_ context: QueueContext, _ error: Error, _ payload: Email) -> EventLoopFuture<Void> {
        // If you don't want to handle errors you can simply return a future. You can also omit this function entirely. 
        return context.eventLoop.future()
    }
}

If using async/await you should use AsyncJob:

struct EmailJob: AsyncJob {
    typealias Payload = Email

    func dequeue(_ context: QueueContext, _ payload: Email) async throws {
        // This is where you would send the email
    }

    func error(_ context: QueueContext, _ error: Error, _ payload: Email) async throws {
        // If you don't want to handle errors you can simply return. You can also omit this function entirely. 
    }
}

Info

Make sure your Payload type implements the Codable protocol.

Tip

Don't forget to follow the instructions in Getting Started to add this job to your configuration file.

Dispatching Jobs

To dispatch a queue job, you need access to an instance of Application or Request. You will most likely be dispatching jobs inside of a route handler:

app.get("email") { req -> EventLoopFuture<String> in
    return req
        .queue
        .dispatch(
            EmailJob.self, 
            .init(to: "email@email.com", message: "message")
        ).map { "done" }
}

// or

app.get("email") { req async throws -> String in
    try await req.queue.dispatch(
        EmailJob.self, 
        .init(to: "email@email.com", message: "message"))
    return "done"
}

If you, instead, need to dispatch a job from a context where the Request object is not available (like, for example, from within a Command), you will need to use the queues property inside the Application object, such as:

struct SendEmailCommand: AsyncCommand {
    func run(using context: CommandContext, signature: Signature) async throws {
        context
            .application
            .queues
            .queue
            .dispatch(
                EmailJob.self, 
                .init(to: "email@email.com", message: "message")
            )
    }
}

Setting maxRetryCount

Jobs will automatically retry themselves upon error if you specify a maxRetryCount. For example:

app.get("email") { req -> EventLoopFuture<String> in
    return req
        .queue
        .dispatch(
            EmailJob.self, 
            .init(to: "email@email.com", message: "message"),
            maxRetryCount: 3
        ).map { "done" }
}

// or

app.get("email") { req async throws -> String in
    try await req.queue.dispatch(
        EmailJob.self, 
        .init(to: "email@email.com", message: "message"),
        maxRetryCount: 3)
    return "done"
}

Specifying a delay

Jobs can also be set to only run after a certain Date has passed. To specify a delay, pass a Date into the delayUntil parameter in dispatch:

app.get("email") { req async throws -> String in
    let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // One day
    try await req.queue.dispatch(
        EmailJob.self, 
        .init(to: "email@email.com", message: "message"),
        maxRetryCount: 3,
        delayUntil: futureDate)
    return "done"
}

If a job is dequeued before its delay parameter, the job will be re-queued by the driver.

Specify a priority

Jobs can be sorted into different queue types/priorities depending on your needs. For example, you may want to open an email queue and a background-processing queue to sort jobs.

Start by extending QueueName:

extension QueueName {
    static let emails = QueueName(string: "emails")
}

Then, specify the queue type when you retrieve the jobs object:

app.get("email") { req -> EventLoopFuture<String> in
    let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // One day
    return req
        .queues(.emails)
        .dispatch(
            EmailJob.self, 
            .init(to: "email@email.com", message: "message"),
            maxRetryCount: 3,
            delayUntil: futureDate
        ).map { "done" }
}

// or

app.get("email") { req async throws -> String in
    let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // One day
    try await req
        .queues(.emails)
        .dispatch(
            EmailJob.self, 
            .init(to: "email@email.com", message: "message"),
            maxRetryCount: 3,
            delayUntil: futureDate
        )
    return "done"
}

When accessing from within the Application object you should do as follows:

struct SendEmailCommand: AsyncCommand {
    func run(using context: CommandContext, signature: Signature) async throws {
        context
            .application
            .queues
            .queue(.emails)
            .dispatch(
                EmailJob.self, 
                .init(to: "email@email.com", message: "message"),
                maxRetryCount: 3,
                delayUntil: futureDate
            )
    }
}

If you do not specify a queue the job will be run on the default queue. Make sure to follow the instructions in Getting Started to start workers for each queue type.

Scheduling Jobs

The Queues package also allows you to schedule jobs to occur at certain points in time.

Warning

Scheduled jobs only work when set up before the application boots up, such as in configure.swift. They will not work in route handlers.

Starting the scheduler worker

The scheduler requires a separate worker process to be running, similar to the queue worker. You can start the worker by running this command:

swift run App queues --scheduled

Tip

Workers should stay running in production. Consult your hosting provider to find out how to keep long-running processes alive. Heroku, for example, allows you to specify "worker" dynos like this in your Procfile: worker: App queues --scheduled

Creating a ScheduledJob

To begin, start by creating a new ScheduledJob or AsyncScheduledJob:

import Vapor
import Queues

struct CleanupJob: ScheduledJob {
    // Add extra services here via dependency injection, if you need them.

    func run(context: QueueContext) -> EventLoopFuture<Void> {
        // Do some work here, perhaps queue up another job.
        return context.eventLoop.makeSucceededFuture(())
    }
}

struct CleanupJob: AsyncScheduledJob {
    // Add extra services here via dependency injection, if you need them.

    func run(context: QueueContext) async throws {
        // Do some work here, perhaps queue up another job.
    }
}

Then, in your configure code, register the scheduled job:

app.queues.schedule(CleanupJob())
    .yearly()
    .in(.may)
    .on(23)
    .at(.noon)

The job in the example above will be run every year on May 23rd at 12:00 PM.

Tip

The Scheduler takes the timezone of your server.

Available builder methods

There are five main methods that can be called on a scheduler, each of which creates its respective builder object that contains more helper methods. You should continue building out a scheduler object until the compiler does not give you a warning about an unused result. See below for all available methods:

Helper Function Available Modifiers Description
yearly() in(_ month: Month) -> Monthly The month to run the job in. Returns a Monthly object for further building.
monthly() on(_ day: Day) -> Daily The day to run the job in. Returns a Daily object for further building.
weekly() on(_ weekday: Weekday) -> Daily The day of the week to run the job on. Returns a Daily object.
daily() at(_ time: Time) The time to run the job on. Final method in the chain.
at(_ hour: Hour24, _ minute: Minute) The hour and minute to run the job on. Final method in the chain.
at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) The hour, minute, and period to run the job on. Final method of the chain
hourly() at(_ minute: Minute) The minute to run the job at. Final method of the chain.
minutely() at(_ second: Second) The second to run the job at. Final method of the chain.

Available helpers

Queues ships with some helpers enums to make scheduling easier:

Helper Function Available Helper Enum
yearly() .january, .february, .march, ...
monthly() .first, .last, .exact(1)
weekly() .sunday, .monday, .tuesday, ...
daily() .midnight, .noon

To use the helper enum, call in to the appropriate modifier on the helper function and pass the value. For example:

// Every year in January 
.yearly().in(.january)

// Every month on the first day 
.monthly().on(.first)

// Every week on Sunday 
.weekly().on(.sunday)

// Every day at midnight
.daily().at(.midnight)

Event Delegates

The Queues package allows you to specify JobEventDelegate objects that will receive notifications when the worker takes action on a job. This can be used for monitoring, surfacing insights, or alerting purposes.

To get started, conform an object to JobEventDelegate and implement any required methods

struct MyEventDelegate: JobEventDelegate {
    /// Called when the job is dispatched to the queue worker from a route
    func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
        eventLoop.future()
    }

    /// Called when the job is placed in the processing queue and work begins
    func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
        eventLoop.future()
    }

    /// Called when the job has finished processing and has been removed from the queue
    func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
        eventLoop.future()
    }

    /// Called when the job has finished processing but had an error
    func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
        eventLoop.future()
    }
}

Then, add it in your configuration file:

app.queues.add(MyEventDelegate())

There are a number of third-party packages that use the delegate functionality to provide additional insight into your queue workers:

Testing

To avoid synchronization problems and ensure deterministic testing, the Queues package provides an XCTQueue library and an AsyncTestQueuesDriver driver dedicated to testing which you can use as follows:

final class UserCreationServiceTests: XCTestCase {
    var app: Application!

    override func setUp() async throws {
        self.app = try await Application.make(.testing)
        try await configure(app)

        // Override the driver being used for testing
        app.queues.use(.asyncTest)
    }

    override func tearDown() async throws {
        try await self.app.asyncShutdown()
        self.app = nil
    }
}

See more details in Romain Pouclet's blog post.