Class: Tennis::Backend::Rabbit

Inherits:
Abstract
  • Object
show all
Defined in:
lib/tennis/backend/rabbit.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger:, url:, namespace: "tennis") ⇒ Rabbit

Returns a new instance of Rabbit.



15
16
17
18
19
20
21
22
# File 'lib/tennis/backend/rabbit.rb', line 15

def initialize(logger:, url:, namespace: "tennis")
  super(logger: logger)
  @rabbit_url = url
  @rabbit_namespace = namespace
  @payload_queue = RabbitQueue.new
  @job_classes = nil
  @setup = false
end

Instance Method Details

#ack(task) ⇒ Object



38
39
40
41
# File 'lib/tennis/backend/rabbit.rb', line 38

def ack(task)
  delivery_info = task.meta["_backend"]["delivery_info"]
  channel.acknowledge(delivery_info.delivery_tag, false)
end

#enqueue(job:, method:, args:, delay: nil) ⇒ Object

Delayed jobs are not yet supported with Rabbit backend



25
26
27
28
29
30
# File 'lib/tennis/backend/rabbit.rb', line 25

def enqueue(job:, method:, args:, delay: nil)
  queue(job.class)
  meta = { "enqueued_at" => Time.now.to_i }
  task = Task.new(self, generate_task_id, job, method, args, meta)
  exchange.publish(serialize(task), routing_key: routing_key(job.class))
end

#receive(job_classes:, timeout: 1.0) ⇒ Object



32
33
34
35
36
# File 'lib/tennis/backend/rabbit.rb', line 32

def receive(job_classes:, timeout: 1.0)
  setup(job_classes)
  payload = @payload_queue.pop(timeout)
  payload && deserialize_task(*payload)
end

#requeue(task) ⇒ Object



43
44
45
46
# File 'lib/tennis/backend/rabbit.rb', line 43

def requeue(task)
  delivery_info = task.meta["_backend"]["delivery_info"]
  channel.reject(delivery_info.delivery_tag, true)
end

#resetObject

Reset the backend:

  • stop getting messages from the queues

  • requeue unacked messages



51
52
53
54
55
56
# File 'lib/tennis/backend/rabbit.rb', line 51

def reset
  return unless @setup
  @consumers.each(&:cancel)
  @job_classes = nil
  @setup = false
end