コンテンツにスキップ

Queues

Vapor Queues (vapor/queues) は、タスクの責任をサイドワーカーに譲渡することができる、純粋な Swift のキューシステムです。

このパッケージが適しているタスクの例:

  • メインのリクエストスレッド外でのメール送信
  • 複雑または長時間かかるデータベース操作の実行
  • ジョブの整合性と耐障害性の確保
  • 非クリティカルな処理を遅らせることによる応答時間の短縮
  • 特定の時間にジョブをスケジュール

このパッケージは Ruby Sidekiq に似ており、以下の機能を提供します:

  • シャットダウン、再起動、または新しいデプロイを示すためにホスティングプロバイダーから送信される SIGTERM および SIGINT シグナルの安全な処理。
  • 異なる優先度が付いたキュー。例えば、メールキューで実行するジョブとデータ処理キューで実行するジョブの優先度を指定できます。
  • 予期しない障害に対処するための信頼性の高いキュープロセスの実装。
  • 指定された回数までジョブを成功するまで繰り返す maxRetryCount 機能を含む。
  • NIO を使用して、利用可能なすべてのコアと EventLoop をジョブに活用。
  • 定期実行処理をスケジュールする機能を提供。

Queues には、メインプロトコルとインターフェースする正式にサポートされているドライバが 1 つあります:

また、コミュニティベースのドライバもあります: - QueuesMongoDriver - QueuesFluentDriver

Tip

vapor/queues パッケージは、ドライバを新規に構築している場合を除き、直接依存パッケージに追加しないでください。代わりにドライバパッケージのいずれかを追加してください。

はじめに

Queues の使用を開始する方法を見てみましょう。

Package

Queues を使用するための最初のステップは、SwiftPM パッケージのマニフェストファイルに依存関係としてドライバの 1 つを追加することです。この例では、Redis ドライバを使用します。

// swift-tools-version:5.8
import PackageDescription

let package = Package(
    name: "MyApp",
    dependencies: [
        /// 他の依存関係...
        .package(url: "https://github.com/vapor/queues-redis-driver.git", from: "1.0.0"),
    ],
    targets: [
        .executableTarget(name: "App", dependencies: [
            // 他の依存関係
            .product(name: "QueuesRedisDriver", package: "queues-redis-driver")
        ]),
        .testTarget(name: "AppTests", dependencies: [.target(name: "App")]),
    ]
)

Xcode 内でマニフェストを直接編集した場合、ファイルを保存すると自動的に変更を検出し、新しい依存関係を取得します。それ以外の場合は、ターミナルから swift package resolve を実行して新しい依存関係を取得します。

設定

次のステップは、configure.swift で Queues を設定することです。ここでは、Redis ライブラリを例として使用します:

import QueuesRedisDriver

try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))

Job の登録

ジョブをモデリングした後、次のように configuration セクションに追加する必要があります:

// ジョブを登録
let emailJob = EmailJob()
app.queues.add(emailJob)

プロセスとしてワーカーを実行

新しいキューワーカーを開始するには、swift run App queues を実行します。特定の種類のワーカーを実行する場合は、swift run App queues --queue emails と指定することもできます。

Tip

ワーカーは本番環境で実行し続ける必要があります。長時間実行するプロセスを維持する方法については、ホスティングプロバイダーに従ってください。例えば、Heroku では、Procfile に worker: Run queues のように "worker" dyno を指定できます。これを設定すると、ダッシュボードのリソースタブや heroku ps:scale worker=1(または任意の dyno 数)でワーカーを開始できます。

プロセス内でワーカーを実行

アプリケーションと同じプロセスでワーカーを実行するには(別のサーバーを起動して処理する代わりに)、Application の便利なメソッドを呼び出します:

try app.queues.startInProcessJobs(on: .default)

スケジュールされたジョブをプロセス内で実行するには、次のメソッドを呼び出します:

try app.queues.startScheduledJobs()

Warning

キューワーカーをコマンドラインまたはプロセス内ワーカー経由で起動しない場合、ジョブはディスパッチされません。

Job プロトコル

ジョブは Job または AsyncJob プロトコルで定義されます。

Job オブジェクトのモデリング

import Vapor 
import Foundation 
import Queues 

struct Email: Codable {
    let to: String
    let message: String
}

struct EmailJob: Job {
    typealias Payload = Email

    func dequeue(_ context: QueueContext, _ payload: Email) -> EventLoopFuture<Void> {
        // ここでメールを送信します
        return context.eventLoop.future()
    }

    func error(_ context: QueueContext, _ error: Error, _ payload: Email) -> EventLoopFuture<Void> {
        // エラーを処理しない場合は単に future を返すことができます。また、この関数を完全に省略することもできます。
        return context.eventLoop.future()
    }
}

async/await を使用する場合は、AsyncJob を使用します:

struct EmailJob: AsyncJob {
    typealias Payload = Email

    func dequeue(_ context: QueueContext, _ payload: Email) async throws {
        // ここでメールを送信します
    }

    func error(_ context: QueueContext, _ error: Error, _ payload: Email) async throws {
        // エラーを処理しない場合は単に return します。また、この関数を完全に省略することもできます。
    }
}

Info

Payload 型が Codable プロトコルを実装していることを確認してください。

Tip

Getting Started の指示に従って、このジョブを設定ファイルに追加することを忘れないでください。

ジョブのディスパッチ

キュージョブをディスパッチするには、Application または Request のインスタンスにアクセスする必要があります。ジョブをディスパッチするのは主にルートハンドラー内になるでしょう:

app.get("email") { req -> EventLoopFuture<String> in
    return req
        .queue
        .dispatch(
            EmailJob.self, 
            .init(to: "email@email.com", message: "message")
        ).map { "done" }
}

// or

app.get("email") { req async throws -> String in
    try await req.queue.dispatch(
        EmailJob.self, 
        .init(to: "email@email.com", message: "message"))
    return "done"
}

Request オブジェクトが利用できないコンテキスト(例えば Command 内)でジョブをディスパッチする必要がある場合は、Application オブジェクト内の queues プロパティを使用する必要があります。次のようにします:

struct SendEmailCommand: AsyncCommand {
    func run(using context: CommandContext, signature: Signature) async throws {
        context
            .application
            .queues
            .queue
            .dispatch(
                EmailJob.self, 
                .init(to: "email@email.com", message: "message")
            )
    }
}

maxRetryCount の設定

maxRetryCount を指定した場合、エラーが発生するとジョブは自動的に再試行されます。例えば:

app.get("email") { req -> EventLoopFuture<String> in
    return req
        .queue
        .dispatch(
            EmailJob.self, 
            .init(to: "email@email.com", message: "message"),
            maxRetryCount: 3
        ).map { "done" }
}

// or

app.get("email") { req async throws -> String in
    try await req.queue.dispatch(
        EmailJob.self, 
        .init(to: "email@email.com", message: "message"),
        maxRetryCount: 3)
    return "done"
}

遅延の指定

ジョブを指定した Date が経過してからのみ実行するように設定できます。遅延を指定するには、dispatchdelayUntil パラメータに Date を渡します:

app.get("email") { req async throws -> String in
    let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 1 日後
    try await req.queue.dispatch(
        EmailJob.self, 
        .init(to: "email@email.com", message: "message"),
        maxRetryCount: 3,
        delayUntil: futureDate)
    return "done"
}

ジョブが delay パラメータの前にデキューされた場合、ドライバによってジョブが再キューされます。

優先度の指定

ジョブは必要に応じて異なるキュータイプ/プライオリティに分類できます。例えば、email キューと background-processing キューを開いてジョブを分類したい場合があります。

まず QueueName を拡張します:

extension QueueName {
    static let emails = QueueName(string: "emails")
}

次に、jobs オブジェクトを取得する際にキュータイプを指定します:

app.get("email") { req -> EventLoopFuture<String> in
    let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 1 日後
    return req
        .queues(.emails)
        .dispatch(
            EmailJob.self, 
            .init(to: "email@email.com", message: "message"),
            maxRetryCount: 3,
            delayUntil: futureDate
        ).map { "done" }
}

// or

app.get("email") { req async throws -> String in
    let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 1 日後
    try await req
        .queues(.emails)
        .dispatch(
            EmailJob.self, 
            .init(to: "email@email.com", message: "message"),
            maxRetryCount: 3,
            delayUntil: futureDate
        )
    return "done"
}

Application オブジェクト内からアクセスする場合は、次のようにします:

struct SendEmailCommand: AsyncCommand {
    func run(using context: CommandContext, signature: Signature) async throws {
        context
            .application
            .queues
            .queue(.emails)
            .dispatch(
                EmailJob.self, 
                .init(to: "email@email.com", message: "message"),
                maxRetryCount: 3,
                delayUntil: futureDate
            )
    }
}

キューを指定しない場合、ジョブは default キューで実行されます。各キュータイプのワーカーを起動する手順については、Getting Started の指示に従ってください。

ジョブのスケジューリング

Queues パッケージは、ジョブを特定の時点にスケジュールすることもできます。

Warning

スケジュールされたジョブは、アプリケーションの起動前に configure.swift などで設定する必要があります。ルートハンドラー内では動作しません。

スケジューラワーカーの起動

スケジューラには、キューワーカーと同様に、別のワーカープロセスが必要です。このコマンドを実行してワーカーを起動できます:

swift run App queues --scheduled

Tip

ワーカーは本番環境で実行し続ける必要があります。長時間実行するプロセスを維持する方法については、ホスティングプロバイダーに従ってください。例えば、Heroku では、Procfile に worker: App queues --scheduled と指定することで「worker」 dyno を指定できます。

ScheduledJob の作成

まず、新しい ScheduledJob または AsyncScheduledJob を作成します:

import Vapor
import Queues

struct CleanupJob: ScheduledJob {
    // 追加のサービスが必要な場合は、依存性注入を使用してここに追加します。

    func run(context: QueueContext) -> EventLoopFuture<Void> {
        // ここで何か作業を行い、別のジョブをキューに入れるなどします。
        return context.eventLoop.makeSucceededFuture(())
    }
}

struct CleanupJob: AsyncScheduledJob {
    // 追加のサービスが必要な場合は、依存性注入を使用してここに追加します。

    func run(context: QueueContext) async throws {
        // ここで何か作業を行い、別のジョブをキューに入れるなどします。
    }
}

次に、設定コード内でスケジュールされたジョブを登録します:

app.queues.schedule(CleanupJob())
    .yearly()
    .in(.may)
    .on(23)
    .at(.noon)

上記の例では、ジョブは毎年 5 月 23 日の 12:00 PM に実行されます。

Tip

スケジューラはサーバーのタイムゾーンを考慮します。

利用可能なビルダーメソッド

スケジューラには 5 つの主なメソッドがあり、それぞれがさらにヘルパーメソッドを含むビルダーオブジェクトを作成します。コンパイラが未使用の結果に関する警告を出さなくなるまで、スケジューラオブジェクトを構築し続けます。利用可能なすべてのメソッドは以下のとおりです:

ヘルパー関数 利用可能な修飾子 説明
yearly() in(_ month: Month) -> Monthly ジョブを実行する月。さらに構築するための Monthly オブジェクトを返します。
monthly() on(_ day: Day) -> Daily ジョブを実行する日。さらに構築するための Daily オブジェクトを返します。
weekly() on(_ weekday: Weekday) -> Daily ジョブを実行する曜日。Daily オブジェクトを返します。
daily() at(_ time: Time) ジョブを実行する時間。チェーンの最終メソッド。
at(_ hour: Hour24, _ minute: Minute) ジョブを実行する時間と分。チェーンの最終メソッド。
at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) 実行する時間、分、時間帯。チェーンの最終メソッド。
hourly() at(_ minute: Minute) 実行する分。チェーンの最終メソッド。
minutely() at(_ second: Second) 実行する秒。チェーンの最終メソッド。

利用可能なヘルパー

Queues には、スケジューリングを容易にするためのいくつかのヘルパー enum が付属しています:

ヘルパー関数 利用可能なヘルパー enum
yearly() .january,.february,.march, ...
monthly() .first,.last,.exact(1)
weekly() .sunday,.monday,.tuesday, ...
daily() .midnight,.noon

ヘルパー enum を使用するには、ヘルパー関数の適切な修飾子を呼び出し、値を渡します。例えば:

// 毎年 1 月 
.yearly().in(.january)

// 毎月 1 日 
.monthly().on(.first)

// 毎週日曜日 
.weekly().on(.sunday)

// 毎日深夜
.daily().at(.midnight)

イベントデリゲート

Queues パッケージでは、ワーカーがジョブに対してアクションを取ったときに通知を受け取る JobEventDelegate オブジェクトを指定することができます。これは、モニタリング、インサイトの表示、またはアラートの目的で使用できます。

始めるには、オブジェクトを JobEventDelegate に準拠させ、必要なメソッドを実装します:

struct MyEventDelegate: JobEventDelegate {
    /// ジョブがルートからキューワーカーにディスパッチされたときに呼び出されます
    func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
        eventLoop.future()
    }

    /// ジョブが処理キューに置かれ、作業が開始されたときに呼び出されます
    func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
        eventLoop.future()
    }

    /// ジョブが処理を完了し、キューから削除されたときに呼び出されます
    func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
        eventLoop.future()
    }

    /// ジョブが処理を完了したがエラーが発生したときに呼び出されます
    func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
        eventLoop.future()
    }
}

次に、設定ファイルに追加します:

app.queues.add(MyEventDelegate())

キューワーカーに関する追加のインサイトを提供するために、デリゲート機能を使用するサードパーティパッケージがいくつかあります: