Class: Tennis::Backend::Rabbit
- Inherits:
-
Abstract
- Object
- Abstract
- Tennis::Backend::Rabbit
- Defined in:
- lib/tennis/backend/rabbit.rb
Instance Method Summary collapse
- #ack(task) ⇒ Object
-
#enqueue(job:, method:, args:, delay: nil) ⇒ Object
Delayed jobs are not yet supported with Rabbit backend.
-
#initialize(logger:, url:, namespace: "tennis") ⇒ Rabbit
constructor
A new instance of Rabbit.
- #receive(job_classes:, timeout: 1.0) ⇒ Object
- #requeue(task) ⇒ Object
-
#reset ⇒ Object
Reset the backend: - stop getting messages from the queues - requeue unacked messages.
Constructor Details
#initialize(logger:, url:, namespace: "tennis") ⇒ Rabbit
Returns a new instance of Rabbit.
15 16 17 18 19 20 21 22 |
# File 'lib/tennis/backend/rabbit.rb', line 15 def initialize(logger:, url:, namespace: "tennis") super(logger: logger) @rabbit_url = url @rabbit_namespace = namespace @payload_queue = RabbitQueue.new @job_classes = nil @setup = false end |
Instance Method Details
#ack(task) ⇒ Object
38 39 40 41 |
# File 'lib/tennis/backend/rabbit.rb', line 38 def ack(task) delivery_info = task.["_backend"]["delivery_info"] channel.acknowledge(delivery_info.delivery_tag, false) end |
#enqueue(job:, method:, args:, delay: nil) ⇒ Object
Delayed jobs are not yet supported with Rabbit backend
25 26 27 28 29 30 |
# File 'lib/tennis/backend/rabbit.rb', line 25 def enqueue(job:, method:, args:, delay: nil) queue(job.class) = { "enqueued_at" => Time.now.to_i } task = Task.new(self, generate_task_id, job, method, args, ) exchange.publish(serialize(task), routing_key: routing_key(job.class)) end |
#receive(job_classes:, timeout: 1.0) ⇒ Object
32 33 34 35 36 |
# File 'lib/tennis/backend/rabbit.rb', line 32 def receive(job_classes:, timeout: 1.0) setup(job_classes) payload = @payload_queue.pop(timeout) payload && deserialize_task(*payload) end |
#requeue(task) ⇒ Object
43 44 45 46 |
# File 'lib/tennis/backend/rabbit.rb', line 43 def requeue(task) delivery_info = task.["_backend"]["delivery_info"] channel.reject(delivery_info.delivery_tag, true) end |
#reset ⇒ Object
Reset the backend:
-
stop getting messages from the queues
-
requeue unacked messages
51 52 53 54 55 56 |
# File 'lib/tennis/backend/rabbit.rb', line 51 def reset return unless @setup @consumers.each(&:cancel) @job_classes = nil @setup = false end |