Swiftpack.co - orlandos-nl/MongoQueue as Swift Package

Swiftpack.co is a collection of thousands of indexed Swift packages. Search packages.
See all packages published by orlandos-nl.
orlandos-nl/MongoQueue 1.0.1
MongoDB Job Queue in Swift
⭐️ 6
🕓 18 weeks ago
iOS macOS linux macOS iOS
.package(url: "https://github.com/orlandos-nl/MongoQueue.git", from: "1.0.1")

MongoQueue

A MongoKitten based JobQueue for MongoDB.

Join our Discord for any questions and friendly banter.

Read the Docs for more info.

Quick Start

Connect to MongoDB with MongoKitten regularly:

let db = try await MongoDatabase.connect(to: "mongodb://localhost/my_database")

Select a collection for your job queue:

let queue = MongoQueue(collection: db["tasks"])

Start the queue in the background (for use inside HTTP applications):

try queue.runInBackground()

Alternatively, run the queue in the foreground and block until the queue is stopped. Only use this if your queue worker is only running as a worker. I.E., it isn't serving users on the side.

try await queue.run()

Define your jobs by conforming to ScheduledTask (and implicitly Codable):

struct RegistrationEmailTask: ScheduledTask {
    // Computed property, required by ScheduledTask
    // This executed the task ASAP
    var taskExecutionDate: Date { Date() }
    
    // Stored properties represent the metadata needed to execute the task
    let recipientEmail: String
    let userId: ObjectId
    let fullName: String
    
    func execute(withContext context: Void) async throws {
        // TODO: Send the email
        // Throwing an error triggers `onExecutionFailure`
    }
    
    func onExecutionFailure(failureContext: QueuedTaskFailure<()>) async throws -> TaskExecutionFailureAction {
        // Only attempt the job once. Failing to send the email cancels the job
        return .dequeue()
    }
}

Queue the task in MongoDB:

let task = RegistrationEmailTask(
  recipientEmail: "[email protected]",
  userId: ...,
  fullName: "Joannis Orlandos"
)
try await queue.queueTask(task)

Tada! Just wait for it to be executed.

Advanced Use

Before diving into more (detailed) APIs, here's an overview of how this works:

When you queue a task, it is used to derive the basic information for queueing the job. Parts of these requirements are in the protocol, but have a default value provided by MongoQueue.

Dequeing Process

Each task has a category, a unique string identifying this task's type in the database. When you register your task with MongoQueue, the category is used to know how to decode & execute the task once it is acquired by a worker.

MongoQueue regularly checks, on a timer (and if possible with Change Streams for better responsiveness) whether a new task is ready to grab. When it pulls a task from MongoDB, it takes the highest priority task that is scheduled for execution at this date.

The priority is .normal by default, but urgency can be increased or decreased in a tasks var priority: TaskPriority { get }.

When the task is taken out of the queue, its status is set to executing. This means that other jobs can't execute this task right now. While doing so, the task model's maxTaskDuration is used as an indication of the expected duration of a task. The expected deadline is set on the model in MongoDB by adding maxTaskDuration to the current date.

If the deadline is reached, other workers can (and will) dequeue the task and put it back into scheduled. This assumes the worker has crashed. However, in cases where the task is taking an abnormal amount of time, the worker will update the deadline accordingly.

Due to this system, it is adviced to set urgent and short-lived tasks to a shorter maxTaskDuration. But take network connectivity into consideration, as setting it very low (like 5 seconds) may cause the deadline to be reached before it can be prolonged.

If the task is dequeued, your task model gets a notificatio in func onDequeueTask(withId taskId: ObjectId, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws.

Likewise, on execution failure you get a call on func onExecutionFailure(failureContext: QueuedTaskFailure<ExecutionContext>) async throws -> TaskExecutionFailureAction where you can decide whether to requeue, and whether to apply a maximum amount of attempts.

GitHub

link
Stars: 6
Last commit: 1 week ago
jonrohan Something's broken? Yell at me @ptrpavlik. Praise and feedback (and money) is also welcome.

Release Notes

1.0.0
35 weeks ago

What's Changed

New Contributors

Full Changelog: https://github.com/orlandos-nl/MongoQueue/compare/0.2.3...1.0.0

Swiftpack is being maintained by Petr Pavlik | @ptrpavlik | @swiftpackco | API | Analytics