Class: Rmsg::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/rmsg/task.rb

Overview

Task handles publishing tasks and processing them.

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ Task

When initializing a task handler, the queue will be declared durable, to survive RabbitMQ restarts.

Parameters:

  • params (Hash)

Options Hash (params):

  • :rabbit (Rmsg::Rabbit)

    Example: Rmsg::Rabbit.new

  • :queue (String)

    Example: ‘messages’



9
10
11
12
# File 'lib/rmsg/task.rb', line 9

def initialize(params)
  @rabbit = params[:rabbit]
  @queue = @rabbit.channel.queue(params[:queue], durable: true)
end

Instance Method Details

#publish(message) ⇒ Object

Publish a message in the tasks queue. It is marked a persistent to survive RabbitMQ restarts.

Parameters:

  • message (Hash)

    The message to be consumed.



17
18
19
# File 'lib/rmsg/task.rb', line 17

def publish(message)
  @queue.publish(message.to_json, presistent: true)
end

#subscribe {|message| ... } ⇒ Object

Subscribe to the tasks queue. Subscribing happens by continuously blocking the current process. It is specifically designed for long running processes. When receiving INT it will gracefully close. Consumer processes have a prefetch value of 1 for round-robin distribution. Consumer processes will send a manual ack after processing, to avoid losing tasks.

Yield Parameters:

  • message (Hash)

    The message received, to be processed within the block.



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rmsg/task.rb', line 28

def subscribe
  @rabbit.channel.prefetch(1)
  begin
    @queue.subscribe(block: true, manual_ack: true) do |delivery_info, , payload|
      message = JSON.parse(payload, symbolize_names: true)
      yield message
      @rabbit.channel.ack(delivery_info.delivery_tag)
    end
  rescue Interrupt => _
    @rabbit.close
  end
end