Class: Spacebunny::Device::Amqp
- Inherits:
-
Base
- Object
- Base
- Spacebunny::Device::Amqp
show all
- Defined in:
- lib/spacebunny/device/amqp.rb
Constant Summary
collapse
- DEFAULT_CHANNEL_OPTIONS =
{ passive: true }
- ACK_TYPES =
[:manual, :auto]
Instance Attribute Summary collapse
Attributes inherited from Base
#api_endpoint, #auto_configs, #auto_connection_configs, #channels, #connection_configs, #custom_connection_configs, #host, #id, #key, #log_level, #log_to, #logger, #name, #raise_on_error, #secret, #tls, #tls_ca_certificates, #tls_cert, #tls_key, #verify_peer, #vhost
Instance Method Summary
collapse
Methods inherited from Base
#connection_options=
Constructor Details
#initialize(*args) ⇒ Amqp
Returns a new instance of Amqp.
11
12
13
14
15
|
# File 'lib/spacebunny/device/amqp.rb', line 11
def initialize(*args)
super(:amqp, *args)
@built_channels = {}
@built_exchanges = {}
end
|
Instance Attribute Details
#built_channels ⇒ Object
Returns the value of attribute built_channels.
9
10
11
|
# File 'lib/spacebunny/device/amqp.rb', line 9
def built_channels
@built_channels
end
|
#built_exchanges ⇒ Object
Returns the value of attribute built_exchanges.
9
10
11
|
# File 'lib/spacebunny/device/amqp.rb', line 9
def built_exchanges
@built_exchanges
end
|
#client ⇒ Object
Returns the value of attribute client.
9
10
11
|
# File 'lib/spacebunny/device/amqp.rb', line 9
def client
@client
end
|
Instance Method Details
#connect ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/spacebunny/device/amqp.rb', line 17
def connect
connection_params = connection_configs.dup
connection_params[:user] = connection_params.delete :device_id
connection_params[:password] = connection_params.delete :secret
unless connection_params[:tls] == false
connection_params[:port] = connection_params.delete(:tls_port)
end
connection_params[:log_level] = connection_params.delete(:log_level) || ::Logger::ERROR
@client = Bunny.new(connection_params)
@client.start
logger.info 'Connected to SpaceBunny'
end
|
#disconnect ⇒ Object
35
36
37
38
39
40
41
|
# File 'lib/spacebunny/device/amqp.rb', line 35
def disconnect
super
@built_exchanges = {}
@built_channels = {}
client.stop if client
logger.info 'Disconnected from SpaceBunny'
end
|
43
44
45
46
|
# File 'lib/spacebunny/device/amqp.rb', line 43
def input_channel
return @input_channel if @input_channel
@input_channel = client.create_channel
end
|
#on_receive(options = {}) ⇒ Object
Also known as:
inbox
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/spacebunny/device/amqp.rb', line 48
def on_receive(options = {})
unless block_given?
raise BlockRequired
end
blocking = options.fetch :wait, false
to_ack, auto_ack = parse_ack options.fetch(:ack, :manual)
input_queue.subscribe(block: blocking, manual_ack: to_ack) do |delivery_info, metadata, payload|
message = Device::Message.new self, options, delivery_info, metadata, payload
if message.blacklisted?
message.nack
next
end
yield message
if to_ack && auto_ack
message.ack
end
end
end
|
#publish(channel_name, message, options = {}) ⇒ Object
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/spacebunny/device/amqp.rb', line 74
def publish(channel_name, message, options = {})
if check_client
channel_key = if options[:with_confirm]
"#{channel_name}_confirm"
else
channel_name
end.to_sym
unless @built_exchanges[channel_key]
@built_exchanges[channel_key] = create_channel(channel_name, options)
end
res = @built_exchanges[channel_key].publish message, channel_options(channel_name, options)
@logger.debug 'Message published'
res
else
@logger.debug 'Message NOT published due to client not connected'
false
end
end
|
#wait_for_publish_confirms ⇒ Object
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/spacebunny/device/amqp.rb', line 95
def wait_for_publish_confirms
results = {}
threads = []
@built_channels.each do |name, channel|
if channel.using_publisher_confirmations?
threads << Thread.new do
results[name] = { all_confirmed: channel.wait_for_confirms, nacked_set: channel.nacked_set }
end
end
end
threads.map{ |t| t.join }
results
end
|