Class: Spacebunny::Device::Amqp

Inherits:
Base
  • Object
show all
Defined in:
lib/spacebunny/device/amqp.rb

Constant Summary collapse

DEFAULT_CHANNEL_OPTIONS =
{ passive: true }
ACK_TYPES =
[:manual, :auto]

Instance Attribute Summary collapse

Attributes inherited from Base

#api_endpoint, #auto_configs, #auto_connection_configs, #channels, #connection_configs, #custom_connection_configs, #host, #id, #key, #log_level, #log_to, #logger, #name, #raise_on_error, #secret, #tls, #tls_ca_certificates, #tls_cert, #tls_key, #verify_peer, #vhost

Instance Method Summary collapse

Methods inherited from Base

#connection_options=

Constructor Details

#initialize(*args) ⇒ Amqp

Returns a new instance of Amqp.



11
12
13
14
15
# File 'lib/spacebunny/device/amqp.rb', line 11

def initialize(*args)
  super(:amqp, *args)
  @built_channels = {}
  @built_exchanges = {}
end

Instance Attribute Details

#built_channelsObject (readonly)

Returns the value of attribute built_channels.



9
10
11
# File 'lib/spacebunny/device/amqp.rb', line 9

def built_channels
  @built_channels
end

#built_exchangesObject (readonly)

Returns the value of attribute built_exchanges.



9
10
11
# File 'lib/spacebunny/device/amqp.rb', line 9

def built_exchanges
  @built_exchanges
end

#clientObject (readonly)

Returns the value of attribute client.



9
10
11
# File 'lib/spacebunny/device/amqp.rb', line 9

def client
  @client
end

Instance Method Details

#connectObject



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/spacebunny/device/amqp.rb', line 17

def connect
  # 'Fix' attributes: start from common connection configs and adjust attributes to match what Bunny
  # wants as connection args
  connection_params = connection_configs.dup
  connection_params[:user] = connection_params.delete :device_id
  connection_params[:password] = connection_params.delete :secret
  # Default on a tls connection
  unless connection_params[:tls] == false
    connection_params[:port] = connection_params.delete(:tls_port)
  end
  connection_params[:log_level] = connection_params.delete(:log_level) || ::Logger::ERROR

  # Re-create client every time connect is called
  @client = Bunny.new(connection_params)
  @client.start
  logger.info 'Connected to SpaceBunny'
end

#disconnectObject



35
36
37
38
39
40
41
# File 'lib/spacebunny/device/amqp.rb', line 35

def disconnect
  super
  @built_exchanges = {}
  @built_channels = {}
  client.stop if client
  logger.info 'Disconnected from SpaceBunny'
end

#input_channelObject



43
44
45
46
# File 'lib/spacebunny/device/amqp.rb', line 43

def input_channel
  return @input_channel if @input_channel
  @input_channel = client.create_channel
end

#on_receive(options = {}) ⇒ Object Also known as: inbox



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/spacebunny/device/amqp.rb', line 48

def on_receive(options = {})
  unless block_given?
    raise BlockRequired
  end
  blocking = options.fetch :wait, false
  to_ack, auto_ack = parse_ack options.fetch(:ack, :manual)

  input_queue.subscribe(block: blocking, manual_ack: to_ack) do |delivery_info, , payload|
    message = Device::Message.new self, options, delivery_info, , payload

    # Skip message if required
    if message.blacklisted?
      message.nack
      next
    end

    yield message

    # If ack is :auto then ack current message
    if to_ack && auto_ack
      message.ack
    end
  end
end

#publish(channel_name, message, options = {}) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/spacebunny/device/amqp.rb', line 74

def publish(channel_name, message, options = {})
  if check_client
    channel_key = if options[:with_confirm]
                    "#{channel_name}_confirm"
                  else
                    channel_name
                  end.to_sym

    unless @built_exchanges[channel_key]
      @built_exchanges[channel_key] = create_channel(channel_name, options)
    end
    # Call Bunny "publish"
    res = @built_exchanges[channel_key].publish message, channel_options(channel_name, options)
    @logger.debug 'Message published'
    res
  else
    @logger.debug 'Message NOT published due to client not connected'
    false
  end
end

#wait_for_publish_confirmsObject



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/spacebunny/device/amqp.rb', line 95

def wait_for_publish_confirms
  results = {}
  threads = []
  @built_channels.each do |name, channel|
    if channel.using_publisher_confirmations?
      threads << Thread.new do
        results[name] = { all_confirmed: channel.wait_for_confirms, nacked_set: channel.nacked_set }
      end
    end
  end
  threads.map{ |t| t.join }
  results
end