Module: Legion::Extensions::Transport
Instance Attribute Summary collapse
Instance Method Summary
collapse
#handle_exception, #log
#build_default_exchange, #default_exchange, #transport_class, #transport_path
#actor_class, #actor_const, #actor_name, #calling_class, #calling_class_array, #from_json, #full_path, #lex_class, #lex_const, #lex_name, #normalize, #runner_class, #runner_const, #runner_name, #to_dotted_hash
Instance Attribute Details
#consumers ⇒ Object
Returns the value of attribute consumers.
7
8
9
|
# File 'lib/legion/extensions/transport.rb', line 7
def consumers
@consumers
end
|
#exchanges ⇒ Object
Returns the value of attribute exchanges.
7
8
9
|
# File 'lib/legion/extensions/transport.rb', line 7
def exchanges
@exchanges
end
|
#messages ⇒ Object
Returns the value of attribute messages.
7
8
9
|
# File 'lib/legion/extensions/transport.rb', line 7
def messages
@messages
end
|
#queues ⇒ Object
Returns the value of attribute queues.
7
8
9
|
# File 'lib/legion/extensions/transport.rb', line 7
def queues
@queues
end
|
Instance Method Details
#additional_e_to_q ⇒ Object
154
155
156
|
# File 'lib/legion/extensions/transport.rb', line 154
def additional_e_to_q
[]
end
|
#auto_create_dlx_exchange ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/legion/extensions/transport.rb', line 69
def auto_create_dlx_exchange
dlx = if transport_class::Exchanges.const_defined? 'Dlx'
transport_class::Exchanges::Dlx
else
transport_class::Exchanges.const_set('Dlx', Class.new(default_exchange) do
def exchange_name
"#{super}.dlx"
end
end)
end
dlx.new
end
|
#auto_create_dlx_queue ⇒ Object
83
84
85
86
87
88
89
|
# File 'lib/legion/extensions/transport.rb', line 83
def auto_create_dlx_queue
return if transport_class::Queues.const_defined?('Dlx')
special_name = default_exchange.new.exchange_name
dlx_queue = Legion::Transport::Queue.new "#{special_name}.dlx", auto_delete: false
dlx_queue.bind("#{special_name}.dlx", { routing_key: '#' })
end
|
#auto_create_exchange(exchange, default_exchange = false) ⇒ Object
rubocop:disable Style/OptionalBooleanParameter
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/legion/extensions/transport.rb', line 46
def auto_create_exchange(exchange, default_exchange = false) if Object.const_defined? exchange
Legion::Logging.warn "#{exchange} is already defined"
return
end
return build_default_exchange if default_exchange
transport_class::Exchanges.const_set(exchange.split('::').pop, Class.new(Legion::Transport::Exchange) do
def exchange_name
self.class.ancestors.first.to_s.split('::')[5].downcase
end
end)
end
|
#auto_create_queue(queue) ⇒ Object
60
61
62
63
64
65
66
67
|
# File 'lib/legion/extensions/transport.rb', line 60
def auto_create_queue(queue)
if Kernel.const_defined?(queue)
Legion::Logging.warn "#{queue} is already defined"
return
end
transport_class::Queues.const_set(queue.split('::').last, Class.new(Legion::Transport::Queue))
end
|
#bind(from, to, routing_key: nil, **_options) ⇒ Object
131
132
133
134
135
136
137
138
139
|
# File 'lib/legion/extensions/transport.rb', line 131
def bind(from, to, routing_key: nil, **_options)
from = from.is_a?(String) ? Kernel.const_get(from).new : from.new
to = to.is_a?(String) ? Kernel.const_get(to).new : to.new
to.bind(from, routing_key: routing_key)
rescue StandardError => e
log.fatal e.message
log.fatal e.backtrace
log.fatal({ from: from, to: to, routing_key: routing_key })
end
|
#bind_e_to_q(to:, from: default_exchange, routing_key: nil) ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/legion/extensions/transport.rb', line 100
def bind_e_to_q(to:, from: default_exchange, routing_key: nil, **)
if from.is_a? String
from = "#{transport_class}::Exchanges::#{from.split('_').collect(&:capitalize).join}" unless from.include?('::')
auto_create_exchange(from) unless Object.const_defined? from
end
if to.is_a? String
to = "#{transport_class}::Queues::#{to.split('_').collect(&:capitalize).join}" unless to.include?('::')
auto_create_queue(to) unless Object.const_defined?(to)
end
routing_key = to.to_s.split('::').last.downcase if routing_key.nil?
bind(from, to, routing_key: routing_key)
end
|
#build ⇒ Object
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/legion/extensions/transport.rb', line 9
def build
@queues = []
@exchanges = []
@messages = []
@consumers = []
generate_base_modules
require_transport_items
build_e_to_e
build_e_to_q(e_to_q)
build_e_to_q(additional_e_to_q)
auto_create_dlx_exchange
auto_create_dlx_queue
rescue StandardError => e
Legion::Logging.error e.message
Legion::Logging.error e.backtrace
end
|
#build_e_to_e ⇒ Object
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/legion/extensions/transport.rb', line 115
def build_e_to_e
e_to_e.each do |binding|
if binding[:from].is_a? String
binding[:from] = "#{transport_class}::Exchanges::#{binding[:from].capitalize}" unless binding[:from].include?('::')
auto_create_exchange(binding[:from]) unless Object.const_defined? binding[:from]
end
if binding[:to].is_a? String
binding[:to] = "#{transport_class}::Exchanges::#{binding[:to].capitalize}" unless binding[:to].include?('::')
auto_create_exchange(binding[:to]) unless Object.const_defined? binding[:to]
end
bind(binding[:from], binding[:to], binding)
end
end
|
#build_e_to_q(array) ⇒ Object
91
92
93
94
95
96
97
98
|
# File 'lib/legion/extensions/transport.rb', line 91
def build_e_to_q(array)
array.each do |binding|
binding[:routing_key] = nil unless binding.key? :routing_key
binding[:to] = nil unless binding.key?(:to)
binding[:from] = default_exchange if !binding.key?(:from) || binding[:from].nil?
bind_e_to_q(**binding)
end
end
|
#e_to_e ⇒ Object
150
151
152
|
# File 'lib/legion/extensions/transport.rb', line 150
def e_to_e
[]
end
|
#e_to_q ⇒ Object
141
142
143
144
145
146
147
148
|
# File 'lib/legion/extensions/transport.rb', line 141
def e_to_q
[] if !@exchanges.count != 1
auto = []
@queues.each do |queue|
auto.push(from: @exchanges.first, to: queue, routing_key: queue)
end
auto
end
|
#generate_base_modules ⇒ Object
27
28
29
30
31
32
33
34
|
# File 'lib/legion/extensions/transport.rb', line 27
def generate_base_modules
lex_class.const_set('Transport', Module.new) unless lex_class.const_defined?('Transport')
%w[Queues Exchanges Messages Consumers].each do |thing|
next if transport_class.const_defined? thing
transport_class.const_set(thing, Module.new)
end
end
|
#require_transport_items ⇒ Object
36
37
38
39
40
41
42
43
44
|
# File 'lib/legion/extensions/transport.rb', line 36
def require_transport_items
{ exchanges: @exchanges, queues: @queues, consumers: @consumers, messages: @messages }.each do |item, obj|
Dir[File.expand_path("#{transport_path}/#{item}/*.rb")].sort.each do |file|
require file
file_name = file.to_s.split('/').last.split('.').first
obj.push(file_name) unless obj.include?(file_name)
end
end
end
|