Class: Rmsg::Task
- Inherits:
-
Object
- Object
- Rmsg::Task
- Defined in:
- lib/rmsg/task.rb
Overview
Task handles publishing tasks and processing them.
Instance Method Summary collapse
-
#initialize(params) ⇒ Task
constructor
When initializing a task handler, the queue will be declared durable, to survive RabbitMQ restarts.
-
#publish(message) ⇒ Object
Publish a message in the tasks queue.
-
#subscribe {|message| ... } ⇒ Object
Subscribe to the tasks queue.
Constructor Details
#initialize(params) ⇒ Task
When initializing a task handler, the queue will be declared durable, to survive RabbitMQ restarts.
9 10 11 12 |
# File 'lib/rmsg/task.rb', line 9 def initialize(params) @rabbit = params[:rabbit] @queue = @rabbit.channel.queue(params[:queue], durable: true) end |
Instance Method Details
#publish(message) ⇒ Object
Publish a message in the tasks queue. It is marked a persistent to survive RabbitMQ restarts.
17 18 19 |
# File 'lib/rmsg/task.rb', line 17 def publish() @queue.publish(.to_json, presistent: true) end |
#subscribe {|message| ... } ⇒ Object
Subscribe to the tasks queue. Subscribing happens by continuously blocking the current process. It is specifically designed for long running processes. When receiving INT it will gracefully close. Consumer processes have a prefetch value of 1 for round-robin distribution. Consumer processes will send a manual ack after processing, to avoid losing tasks.
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/rmsg/task.rb', line 28 def subscribe @rabbit.channel.prefetch(1) begin @queue.subscribe(block: true, manual_ack: true) do |delivery_info, , payload| = JSON.parse(payload, symbolize_names: true) yield @rabbit.channel.ack(delivery_info.delivery_tag) end rescue Interrupt => _ @rabbit.close end end |