Module: BBK::AMQP::Utils

Defined in:
lib/bbk/amqp/utils.rb

Class Method Summary collapse

Class Method Details

.commonname(cert_path) ⇒ String

Extract CN certificate attribute from certificate path

Parameters:

  • cert_path (String)

    path to certificate file

Returns:

  • (String)

    certificate CN attribute value



60
61
62
63
# File 'lib/bbk/amqp/utils.rb', line 60

def self.commonname(cert_path)
  cert = OpenSSL::X509::Certificate.new(File.read(cert_path))
  cert.subject.to_a.find {|name, _, _| name == 'CN' }[1]
end

.create_connection(options = {}) ⇒ Bunny::Session

Set default options and create non started connection to amqp

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :hosts (String)

    List of Amqp hosts (default MQ_HOST env variable or mq)

  • :hostname (String)

    Amqp host (default MQ_HOST env variable or mq)

  • :port (Integer)

    Amqp port (default MQ_PORT env variable or 5671 - default tls port)

  • :vhost (String)

    Connected amqp virtual host (default MQ_VHOST env variable or /)

  • :tls (Boolean)

    Use tls (default true)

  • :tls_cert (String)

    Path to certificate file (default config/keys/cert.pem)

  • :tls_key (String)

    Path to key file (default config/keys/key.pem)

  • :tls_ca_certificates (Array)

    List to ca certificates (default config/keys/cacert.pem)

  • :verify (String)

    Verification option server certificate *

  • :verify_peer (String)

    Verification option server certificate *

  • :verify_ssl (String)

    Verification option server certificate *

  • :auth_mechanism (String)

    Amqp authorization mechanism (default EXTERNAL)

  • :automatically_recover (Boolean)

    Allow automatic network failure recovery (default false)

  • :automatic_recovery (Boolean)

    Alias for automatically_recover (default false)

  • :recovery_attempts (Integer)

    Limits the number of connection recovery attempts performed by Bunny (default 0, nil - unlimited)

  • :recover_from_connection_close (Boolean) — default: default false

Returns:

  • (Bunny::Session)

    non started amqp connection



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/bbk/amqp/utils.rb', line 83

def self.create_connection(options = {})
  hosts = [options[:hosts] || options[:host] || options[:hostname]].flatten.select(&:present?).uniq
  hosts = hosts.map{|h| h.split(/[;|]/) }.flatten.select(&:present?).uniq

  options[:hosts] = if hosts.empty?
    [ENV.fetch('MQ_HOST', 'mq')].split(/[;|]/).flatten.select(&:present?).uniq
  else
    hosts
  end

  options[:port] ||= ENV['MQ_PORT'] || 5671
  options[:vhost] ||= ENV['MQ_VHOST'] || '/'
  user = options[:username] || options[:user] || ENV['MQ_USER']
  options[:username] = options[:user] = user

  # Передаем пустую строку чтобы bunny не использовал пароль по умолчанию guest
  pwd = options[:password] || options[:pass] || options[:pwd] || ENV['MQ_PASS'] || ''
  options[:password] = options[:pass] = options[:pwd] = pwd

  options[:tls] = options.fetch(:tls, true)
  options[:tls_cert] ||= 'config/keys/cert.pem'
  options[:tls_key] ||= 'config/keys/key.pem'
  options[:tls_ca_certificates] ||= ['config/keys/cacert.pem']

  options[:verify] =
    options.fetch(:verify, options.fetch(:verify_peer, options.fetch(:verify_ssl, nil)))
  options[:verify] = true if options[:verify]
  options[:verify_peer] = options[:verify]
  options[:verify_ssl] = options[:verify]

  options[:auth_mechanism] ||= if options[:tls]
    'EXTERNAL'
  else
    'PLAIN'
  end

  options[:automatically_recover] ||= false
  options[:automatic_recovery]    ||= false
  options[:recovery_attempts]     ||= 0
  options[:recover_attempts] = options[:recovery_attempts]
  options[:recover_from_connection_close] ||= false
  Bunny.new(options)
end

.pop(queue, timeout = 10) ⇒ Array

Try get message from amqp queue

Parameters:

  • queue (Bunny::Queue)
  • timeout (Integer) (defaults to: 10)

    in seconds for waiting message message in queue

Returns:

  • (Array)

    array with delivery_info, metadata and payload

Raises:

  • (Timeout::Error)

    if queue empty in timeout time duration



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/bbk/amqp/utils.rb', line 15

def self.pop(queue, timeout = 10)
  unblocker = Queue.new
  consumed = false
  mx = Mutex.new
  # Если сообщений несколько то порядок может поменяться и это нужно иметь в виду.
  # Решить можно создав, отдельный канал с qos 1.
  consumer = queue.subscribe(block: false, manual_ack: true) do |delivery_info, , payload|
    mx.synchronize do
      if consumed
        queue.channel.nack(delivery_info.delivery_tag, false, true)
        sleep 0.1
        break
      end
      consumed = true
      message = [
        delivery_info,
        .to_hash.with_indifferent_access,
        begin
          Oj.load(payload).with_indifferent_access
        rescue StandardError
          payload
        end
      ]
      unblocker << message
    end
  end
  Thread.new do
    sleep timeout
    unblocker << :timeout
  end
  result = unblocker.pop
  if result == :timeout
    consumed = true
    consumer.cancel
    raise ::Timeout::Error
  end
  queue.channel.ack(result[0].delivery_tag)
  sleep 0.2
  consumer.cancel
  result
end