Class: Rabbitek::Starter

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/rabbitek/server/starter.rb

Overview

Main server startup

Instance Method Summary collapse

Methods included from Loggable

#debug, #error, #info, #logger, logger, #warn

Constructor Details

#initialize(connection, configuration) ⇒ Starter

Returns a new instance of Starter.



9
10
11
12
13
14
# File 'lib/rabbitek/server/starter.rb', line 9

def initialize(connection, configuration)
  @connection = connection
  @queue_name = configuration[:parameters][:queue]
  @consumers = configuration[:consumers]
  @opts = configuration[:parameters]
end

Instance Method Details

#channelObject



26
27
28
29
30
31
32
# File 'lib/rabbitek/server/starter.rb', line 26

def channel
  @channel ||= begin
                 channel = connection.create_channel
                 channel.basic_qos(opts[:basic_qos]) if opts[:basic_qos].present?
                 channel
               end
end

#retry_or_delayed_exchangeObject



50
51
52
53
54
55
56
# File 'lib/rabbitek/server/starter.rb', line 50

def retry_or_delayed_exchange
  @retry_or_delayed_exchange ||= Utils::Common.exchange(
    channel,
    :direct,
    Utils::RabbitObjectNames.retry_or_delayed_bind_exchange(opts[:bind_exchange])
  )
end

#retry_or_delayed_queueObject



42
43
44
45
46
47
48
# File 'lib/rabbitek/server/starter.rb', line 42

def retry_or_delayed_queue
  @retry_or_delayed_queue ||= Utils::Common.queue(
    channel,
    Utils::RabbitObjectNames.retry_or_delayed_queue(opts[:queue]),
    arguments: { 'x-dead-letter-exchange': opts[:bind_exchange] }
  )
end

#startObject



16
17
18
19
20
21
22
23
24
# File 'lib/rabbitek/server/starter.rb', line 16

def start
  setup_bindings!

  work_queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
    Rabbitek.reloader.call do
      MessageProcessor.new(self, delivery_info, properties, payload).process
    end
  end
end

#work_exchangeObject



34
35
36
# File 'lib/rabbitek/server/starter.rb', line 34

def work_exchange
  @work_exchange ||= Utils::Common.exchange(channel, 'direct', opts[:bind_exchange])
end

#work_queueObject



38
39
40
# File 'lib/rabbitek/server/starter.rb', line 38

def work_queue
  @work_queue ||= Utils::Common.queue(channel, queue_name, opts[:queue_attributes])
end