Class: Istox::BunnyBoot

Inherits:
Object
  • Object
show all
Defined in:
lib/istox/helpers/bunny_boot.rb

Class Method Summary collapse

Class Method Details

.channelObject

INTERNAL METHODS START



35
36
37
38
39
40
41
42
# File 'lib/istox/helpers/bunny_boot.rb', line 35

def channel
  return @channel if @channel.present?

  @channel = connection.create_channel(nil, data['channel_pool_size'] || 1)
  @channel.prefetch(data['channel_prefetch'] || 1)

  @channel
end

.connectionObject



105
106
107
108
109
110
111
112
113
114
# File 'lib/istox/helpers/bunny_boot.rb', line 105

def connection
  return @connection if @connection.present?

  raise 'RabbitMQ connection configuration not found, have you forgotten to define connect key in amqp.yml?' if data['connect'].nil?

  @connection = Bunny.new(data['connect'].symbolize_keys)
  @connection.start

  @connection
end

.create_new_channel(channel_config) ⇒ Object

calling this method will spawn another channel, please use the normal channel method if you like to use the same channel



45
46
47
48
49
50
# File 'lib/istox/helpers/bunny_boot.rb', line 45

def create_new_channel(channel_config)
  channel = connection.create_channel(nil, channel_config['channel_pool_size'] || data['channel_pool_size'] || 1)
  channel.prefetch(channel_config['channel_prefetch'] || data['channel_prefetch'] || 1)

  channel
end

.dataObject



99
100
101
102
103
# File 'lib/istox/helpers/bunny_boot.rb', line 99

def data
  raise 'Unable to find amqp configuration file, have you forgotten to initialise Istox::BunnyBoot?' if @amqp_config_path.empty?

  @data ||= JSON.parse(YAML.load_file(@amqp_config_path).to_json)
end

.exchange_config!(consumer_key) ⇒ Object



116
117
118
119
120
121
122
123
# File 'lib/istox/helpers/bunny_boot.rb', line 116

def exchange_config!(consumer_key)
  exchange_name = data['queues'][consumer_key]['exchange']
  exchange_config = data['exchanges'][exchange_name]

  raise "Exchange #{exchange_name} config not found, have you forgotten to define the exchange in amqp.yml?" if exchange_config.nil?

  exchange_config
end

.initialize!(amqp_config_path) ⇒ Object

path to amqp.yml



8
9
10
11
12
# File 'lib/istox/helpers/bunny_boot.rb', line 8

def initialize!(amqp_config_path)
  @amqp_config_path = amqp_config_path

  data
end

.start_subscribe(consumer_key: nil, consumer_keys: nil) ⇒ Object

optionally can pass in consumer_key for single subscription / consumer_keys for multiple subcriptions consumer_key must be defined in amqp.yml if nothing pass in it will auto subscribe to all available consumers defined in amqp.yml queues key



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/istox/helpers/bunny_boot.rb', line 17

def start_subscribe(consumer_key: nil, consumer_keys: nil)
  subscribing_consumer_keys = consumer_keys.present? ? consumer_keys : []

  if subscribing_consumer_keys.empty? && consumer_key.nil?
    subscribing_consumer_keys = data['queues'].keys
  elsif subscribing_consumer_keys.empty? && consumer_key.present?
    subscribing_consumer_keys = [consumer_key]
  end

  subscribing_consumer_keys.each do |key|
    subscribe_to_consumer(key.to_s)
  end
end

.subscribe_to_consumer(consumer_key) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/istox/helpers/bunny_boot.rb', line 52

def subscribe_to_consumer(consumer_key)
  queue_config = data['queues'][consumer_key]

  raise "Unable to find consumer with key #{consumer_key}, have you forgotten to define it in amqp.yml?" if queue_config.nil?

  klass = Object.const_get(
    '::' + (queue_config['ruby_class'].nil? ? "#{consumer_key.to_s.underscore}_consumer" : queue_config['ruby_class']).camelize
  )

  exchange = nil
  exchange_config = exchange_config!(consumer_key)

  active_channel = queue_config['channel'].present? ? create_new_channel(queue_config['channel']) : channel

  queue_options = {}

  exchange_durable = exchange_config['durable'].nil? ? true : exchange_config['durable']

  case exchange_config['type']
  when 'fanout'
    exchange = active_channel.fanout(queue_config['exchange'], durable: exchange_durable)
  when 'direct'
    exchange = active_channel.direct(queue_config['exchange'], durable: exchange_durable)
    queue_options[:routing_key] = queue_config['queue_name']
  else
    raise "Exchange type #{exchange_config.type} is not valid/supported."
  end

  queue = active_channel.queue(queue_config['queue_name'],
                               durable: queue_config['durable'].nil? ? exchange_durable : queue_config['durable']).bind(exchange, queue_options)

  manual_ack = queue_config['manual_ack'].nil? ? true : queue_config['manual_ack']

  queue.subscribe manual_ack: manual_ack do |delivery_info, , payload|
    processing_paylod = JSON.parse(payload)
    payload_object = ::Istox::CommonHelper.to_open_struct(processing_paylod)

    log.info "Processing in consumer: #{klass}, paylod: #{payload_object.to_h.inspect}"

    klass.new.process(payload_object, , delivery_info)
    active_channel.ack(delivery_info.delivery_tag) if manual_ack
  rescue StandardError => e
    log.error(e)
    active_channel.nack(delivery_info.delivery_tag, false, true) if manual_ack
  end
end