Swiftpack.co - Package - johnbona/disque

Disque

CircleCI Swift Vapor

A non-blocking, event-driven Swift client for Disque, ~~shamelessly copied~~ built on Vapor's Redis client. Disque is a distributed job queue created by Salvatore Sanfilippo (@antirez) "forked" off of Redis.

Disque Features

  • Distributed - Disque clusters are multi-master with each node having the same role so producers and consumers can attach to any node. There is no requirement for producers and consumers of the same queue to be connected to the same node.
  • Fault Tolerant - Jobs are synchronously replicated to multiple nodes such that when a job is added to a queue, the job is replicated to N nodes before the node returns the job ID to the client. N-1 nodes can fail and the job will still be delivered.
  • Tunable Persistence - By default, Disque stores jobs in-memory, however, Disque can be configured to persist jobs on disk by turning on AOF (similar to Redis).
  • Customizable Delivery Semantics: By default, jobs have at-least-once delivery semantics but if the retry time is set to 0, jobs will have at-most-once delivery semantics. For at-least-once delivery, jobs are automatically re-queued until the job is acknowledged by a consumer or until the job expires.
  • Job Priority - Consumers can acquire jobs from multiple queues from one GETJOB command. Disque will search for jobs from the defined queues starting first with the left-most queue simulating job priority.
  • Scheduled/Delayed Jobs - Jobs can be scheduled/delayed for a set period of time. Delayed jobs are not delivered to consumers until the delay has expired.

Installation and Setup

1. Start a Disque container using Docker or follow Disque's installation instructions.

docker run -d --rm -p 7711:7711 --name disque efrecon/disque:1.0-rc1

2 Add the dependency to Package.swift.

.package(url: "https://github.com/johnbona/disque", from: "0.1.0")

3. Register the provider and config in configure.swift.

public func configure(_ config: inout Config, _ env: inout Environment, _ services: inout Services ) throws {
	// ...

	try services.register(DisqueProvider())

	var databases = DatabasesConfig()
	let disqueConfig = DisqueDatabase(config: DisqueClientConfig())

	databases.add(database: disqueConfig, as: .disque)
	services.register(databases)
}

4. Fetch a connection from the connection pool and use the Disque client.

return req.withPooledConnection(to: .disque) { disque -> Future<[Job<TestJob>]> in
	return disque.get(count: 1, from: ["test-queue"], as: TestJob.self)
}

Usage

All of Disque's commands (except for HELLO, QPEEK, QSCAN, JSCAN) are implemented in Swift with job bodies conforming to Codable.

Get a job from the queue

return req.withPooledConnection(to: .disque) { disque -> Future<[Job<TestJob>]> in
	return disque.get(count: 1, from: ["test-queue"], as: TestJob.self)
}

Get a job from multiple queues (job priority)

return req.withPooledConnection(to: .disque) { disque -> Future<[Job<TestJob>]> in
	return disque.get(count: 1, from: ["high-priority", "low-priority"], as: TestJob.self)
}

Add a job to a queue

req.withPooledConnection(to: .disque) { disque -> Future<String> in
	return try disque.add(job: TestJob(), to: "test-queue")
}

Completing (acking) a job

req.withPooledConnection(to: .disque) { disque -> Future<Int> in
	return try disque.ack(jobIDs: ["D-00000000-000000000000000000000000-0000"])
}

Resources

License

Disque is released under the MIT License.

Github

link
Stars: 9
Help us keep the lights on

Dependencies

Used By

Total:

Releases

0.1.0 - Sep 26, 2018