Queues¶
Vapor Queues (vapor/queues) is een puur Swift wachtrijsysteem waarmee je taakverantwoordelijkheid kunt overdragen aan een side worker.
Sommige van de taken waar dit pakket goed voor werkt:
- E-mails versturen buiten de main request thread
- Het uitvoeren van complexe of langlopende database operaties
- Zorgen voor integriteit en bestendigheid van het werk
- Snellere reactietijd door uitstel van niet-kritieke bewerkingen
- Taken plannen om op een specifiek tijdstip te gebeuren
Dit pakket is vergelijkbaar met Ruby Sidekiq. Het biedt de volgende mogelijkheden aan:
- Veilig omgaan met
SIGTERM
enSIGINT
signalen die door hosting providers worden gestuurd om een shutdown, herstart, of nieuwe deploy aan te geven. - Verschillende wachtrijprioriteiten. U kunt bijvoorbeeld een wachtrijtaak opgeven die op de e-mailwachtrij moet worden uitgevoerd en een andere taak die op de gegevensverwerkingswachtrij moet worden uitgevoerd.
- Implementeert het betrouwbare wachtrijproces om te helpen bij onverwachte storingen.
- Bevat een
maxRetryCount
functie die de opdracht zal herhalen totdat deze slaagt tot een gespecificeerd aantal. - Gebruikt NIO om alle beschikbare cores te gebruiken en EventLoops voor taken.
- Hiermee kunnen gebruikers herhalende taken plannen
Queues heeft momenteel één officieel ondersteund stuurprogramma dat een interface heeft met het hoofdprotocol:
Queues heeft ook community-based drivers: - QueuesMongoDriver - QueuesFluentDriver
Tip
U moet het vapor/queues
pakket niet direct installeren, tenzij u een nieuw stuurprogramma bouwt. Installeer in plaats daarvan een van de stuurprogrammapakketten.
Aan De Slag¶
Laten we eens kijken hoe je aan de slag kunt met wachtrijen.
Package¶
De eerste stap om Queues te gebruiken is het toevoegen van een van de stuurprogramma's als een afhankelijkheid van je project in je SwiftPM package manifest bestand. In dit voorbeeld gebruiken we het Redis stuurprogramma.
// swift-tools-version:5.8
import PackageDescription
let package = Package(
name: "MyApp",
dependencies: [
/// Any other dependencies ...
.package(url: "https://github.com/vapor/queues-redis-driver.git", from: "1.0.0"),
],
targets: [
.executableTarget(name: "App", dependencies: [
// Other dependencies
.product(name: "QueuesRedisDriver", package: "queues-redis-driver")
]),
.testTarget(name: "AppTests", dependencies: [.target(name: "App")]),
]
)
Als u het manifest direct in Xcode bewerkt, zal het automatisch de wijzigingen oppikken en de nieuwe dependency ophalen wanneer het bestand wordt opgeslagen. Anders, voer swift package resolve
uit vanuit Terminal om de nieuwe dependency op te halen.
Configuratie¶
De volgende stap is het configureren van wachtrijen in configure.swift
. We gebruiken de Redis bibliotheek als voorbeeld:
import QueuesRedisDriver
try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))
Registreren van een Job
¶
Na het modelleren van een job moet u deze toevoegen aan uw configuratiesectie zoals dit:
//Registreer jobs
let emailJob = EmailJob()
app.queues.add(emailJob)
Workers Uitvoeren Als Processen¶
Om een nieuwe wachtrijwerker te starten, voer swift run App queues
uit. U kunt ook een specifiek type werker specificeren om te draaien: swift run App queues --queue emails
.
Tip
Workers moeten blijven draaien in productie. Raadpleeg uw hosting provider om uit te vinden hoe u langlopende processen in leven kunt houden. Heroku, bijvoorbeeld, staat je toe om "worker" dyno's te specificeren zoals dit in je Procfile: worker: Run queues
. Met dit in plaats, kun je workers starten op het Dashboard/Resources tab, of met heroku ps:scale worker=1
(of elk gewenst aantal dynos).
Lopende Workers in uitvoering¶
Om een worker te draaien in hetzelfde proces als je applicatie (in tegenstelling tot het starten van een hele aparte server om het af te handelen), roep de convenience methodes op in Application
:
try app.queues.startInProcessJobs(on: .default)
Om geplande taken in proces uit te voeren, roept u de volgende methode op:
try app.queues.startScheduledJobs()
Waarschuwing
Als je de wachtrij-werker niet start, hetzij via de opdrachtregel, hetzij via de in-proces-werker, zullen de jobs niet worden verzonden.
Het Job
Protocol¶
Jobs worden gedefinieerd door het Job
of AsyncJob
protocol.
Het modelleren van een Job
object:¶
import Vapor
import Foundation
import Queues
struct Email: Codable {
let to: String
let message: String
}
struct EmailJob: Job {
typealias Payload = Email
func dequeue(_ context: QueueContext, _ payload: Email) -> EventLoopFuture<Void> {
// Dit is waar je de e-mail zou sturen
return context.eventLoop.future()
}
func error(_ context: QueueContext, _ error: Error, _ payload: Email) -> EventLoopFuture<Void> {
// Als u geen fouten wilt afhandelen, kunt u gewoon een toekomst teruggeven. U kunt deze functie ook helemaal weglaten.
return context.eventLoop.future()
}
}
Als je async
/await
gebruikt moet je AsyncJob
gebruiken:
struct EmailJob: AsyncJob {
typealias Payload = Email
func dequeue(_ context: QueueContext, _ payload: Email) async throws {
// Dit is waar je de e-mail zou sturen
}
func error(_ context: QueueContext, _ error: Error, _ payload: Email) async throws {
// Als u geen fouten wilt afhandelen, kunt u gewoon terugkeren. Je kunt deze functie ook helemaal weglaten.
}
}
Info
Zorg ervoor dat uw Payload
type het Codable
protocol implementeert.
Tip
Vergeet niet de instructies in Aan De Slag te volgen om deze taak aan uw configuratiebestand toe te voegen.
Jobs Dispatchen¶
Om een wachtrijjob te dispatchen, heb je toegang nodig tot een instantie van Application
of Request
. Je zult waarschijnlijk jobs versturen binnen een route handler:
app.get("email") { req -> EventLoopFuture<String> in
return req
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message")
).map { "done" }
}
// of
app.get("email") { req async throws -> String in
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"))
return "done"
}
Als u in plaats daarvan een opdracht moet verzenden vanuit een context waar het Request
object niet beschikbaar is (zoals bijvoorbeeld vanuit een Command
), dan moet u de queues
eigenschap binnen het Application
object gebruiken, zoals bijvoorbeeld:
struct SendEmailCommand: AsyncCommand {
func run(using context: CommandContext, signature: Signature) async throws {
context
.application
.queues
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message")
)
}
}
Instelling maxRetryCount
¶
Jobs zullen zichzelf automatisch opnieuw proberen bij een fout als je een maxRetryCount
opgeeft. Bijvoorbeeld:
app.get("email") { req -> EventLoopFuture<String> in
return req
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3
).map { "done" }
}
// of
app.get("email") { req async throws -> String in
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3)
return "done"
}
Een Vertraging Opgeven¶
Jobs kunnen ook zo worden ingesteld dat ze pas worden uitgevoerd als een bepaalde Datum
is verstreken. Om een vertraging op te geven, geef je een Datum
op in de delayUntil
parameter in dispatch
:
app.get("email") { req async throws -> String in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 1 dag
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate)
return "done"
}
Als een opdracht vóór de vertragingsparameter wordt verwijderd, wordt de opdracht door de bestuurder opnieuw geplaatst.
Geef Een Prioriteit Aan¶
Jobs kunnen gesorteerd worden in verschillende wachtrij types/prioriteiten afhankelijk van uw behoeften. U kunt bijvoorbeeld een email
wachtrij en een background-processing
wachtrij openen om taken te sorteren.
Begin met QueueName
uit te breiden:
extension QueueName {
static let emails = QueueName(string: "emails")
}
Dan, specificeer het wachtrij type wanneer je het jobs
object ophaalt:
app.get("email") { req -> EventLoopFuture<String> in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 1 dag
return req
.queues(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
).map { "done" }
}
// of
app.get("email") { req async throws -> String in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 1 dag
try await req
.queues(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
)
return "done"
}
Bij het benaderen vanuit het Application
object moet je als volgt te werk gaan:
struct SendEmailCommand: AsyncCommand {
func run(using context: CommandContext, signature: Signature) async throws {
context
.application
.queues
.queue(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
)
}
}
Als u geen wachtrij opgeeft, wordt de taak uitgevoerd op de standaard
wachtrij. Zorg ervoor dat u de instructies in Aan De Slag volgt om werkers voor elk wachtrijtype te starten.
Jobs Inplannen¶
Met het pakket Queues kunt u ook taken plannen die op bepaalde tijdstippen moeten worden uitgevoerd.
Waarschuwing
Geplande taken werken alleen als ze zijn ingesteld voordat de applicatie opstart, zoals in configure.swift
. Ze werken niet in route handlers.
De planner starten¶
De scheduler vereist dat een afzonderlijk workerproces draait, gelijkaardig aan de queue worker. U kunt de worker starten door dit commando uit te voeren:
swift run App queues --scheduled
Tip
Workers moeten blijven draaien in productie. Raadpleeg uw hosting provider om uit te vinden hoe u langlopende processen in leven kunt houden. Heroku, bijvoorbeeld, staat je toe om "worker" dyno's te specificeren zoals dit in je Procfile: worker: App queues --scheduled
Een ScheduledJob
Maken¶
Om te beginnen, maak je een nieuwe ScheduledJob
of AsyncScheduledJob
:
import Vapor
import Queues
struct CleanupJob: ScheduledJob {
// Voeg hier extra diensten toe via dependency injection, als je die nodig hebt.
func run(context: QueueContext) -> EventLoopFuture<Void> {
// Doe wat werk hier, misschien een andere job.
return context.eventLoop.makeSucceededFuture(())
}
}
struct CleanupJob: AsyncScheduledJob {
// Voeg hier extra diensten toe via dependency injection, als je die nodig hebt.
func run(context: QueueContext) async throws {
// Doe wat werk hier, misschien een andere job.
}
}
Registreer vervolgens in uw configureercode de geplande job:
app.queues.schedule(CleanupJob())
.yearly()
.in(.may)
.on(23)
.at(.noon)
De opdracht in het bovenstaande voorbeeld wordt elk jaar uitgevoerd op 23 mei om 12:00 PM.
Tip
De Scheduler neemt de tijdzone van uw server.
Beschikbare bouwmethodes¶
Er zijn vijf hoofdmethoden die aangeroepen kunnen worden op een scheduler, die elk hun eigen builder object maken dat meer helper methoden bevat. U moet doorgaan met het bouwen van een scheduler-object totdat de compiler u geen waarschuwing geeft over een ongebruikt resultaat. Zie hieronder voor alle beschikbare methoden:
Helper Functie | Beschikbare Modifiers | Beschrijving |
---|---|---|
yearly() |
in(_ month: Month) -> Monthly |
De maand om de job in uit te voeren. Geeft een Maand object voor verdere opbouw. |
monthly() |
on(_ day: Day) -> Daily |
De dag om de job uit te voeren. Geeft een Daily object voor verdere opbouw. |
weekly() |
on(_ weekday: Weekday) -> Daily |
De dag van de week om de job op uit te voeren. Geeft als resultaat een Daily object. |
daily() |
at(_ time: Time) |
De tijd om de opdracht uit te voeren. Laatste methode in de keten. |
at(_ hour: Hour24, _ minute: Minute) |
Het uur en de minuten om de job uit te voeren. Laatste methode in de keten. | |
at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) |
Het uur, de minuten en de periode om de job uit te voeren. Eindmethode van de keten | |
hourly() |
at(_ minute: Minute) |
De minuut om de opdracht uit te voeren. De laatste methode van de ketting. |
minutely() |
at(_ second: Second) |
De seconde om de opdracht uit te voeren. De laatste methode van de ketting. |
Beschikbare helpers¶
Wachtrijen worden geleverd met enkele helpers enums om het plannen te vergemakkelijken:
Helper Functie | Beschikbare Helper Enum |
---|---|
yearly() |
.january , .february , .march , ... |
monthly() |
.first , .last , .exact(1) |
weekly() |
.sunday , .monday , .tuesday , ... |
daily() |
.midnight , .noon |
Om de helper-enum te gebruiken, roep de juiste modifier aan op de helperfunctie en geef de waarde door. Bijvoorbeeld:
// Elk jaar in januari
.yearly().in(.january)
// Elke maand op de eerste dag
.monthly().on(.first)
// Elke week op zondag
.weekly().on(.sunday)
// Elke dag om middernacht
.daily().at(.midnight)
Event Delegates¶
Het Queues pakket maakt het mogelijk om JobEventDelegate
objecten te specificeren die notificaties zullen ontvangen wanneer de werker actie onderneemt op een job. Dit kan gebruikt worden voor monitoring, inzichten te verschaffen, of waarschuwingsdoeleinden.
Om te beginnen, conformeer een object aan JobEventDelegate
en implementeer alle vereiste methodes
struct MyEventDelegate: JobEventDelegate {
/// Wordt aangeroepen wanneer de taak wordt verzonden naar de wachtrijwerker vanuit een route
func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// Wordt aangeroepen wanneer de taak in de verwerkingswachtrij wordt geplaatst en het werk begint
func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// Wordt aangeroepen wanneer de taak klaar is met verwerken en verwijderd is uit de wachtrij
func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// Wordt aangeroepen wanneer de opdracht klaar is met verwerken maar een fout had
func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
}
Voeg het dan toe in uw configuratiebestand:
app.queues.add(MyEventDelegate())
Er zijn een aantal pakketten van derden die de delegate-functionaliteit gebruiken om extra inzicht te verschaffen in uw wachtrijwerkers: