Class: Legion::Transport::Message
- Inherits:
-
Object
- Object
- Legion::Transport::Message
show all
- Includes:
- Common
- Defined in:
- lib/legion/transport/message.rb
Instance Method Summary
collapse
Methods included from Common
#channel, #channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder
Constructor Details
#initialize(**options) ⇒ Message
Returns a new instance of Message.
6
7
8
9
|
# File 'lib/legion/transport/message.rb', line 6
def initialize(**options)
@options = options
validate
end
|
Instance Method Details
#content_encoding ⇒ Object
93
94
95
|
# File 'lib/legion/transport/message.rb', line 93
def content_encoding
'identity'
end
|
#content_type ⇒ Object
89
90
91
|
# File 'lib/legion/transport/message.rb', line 89
def content_type
'application/json'
end
|
#encode_message ⇒ Object
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/legion/transport/message.rb', line 35
def encode_message
message_payload = message
message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String
if encrypt?
encrypted = Legion::Crypt.encrypt(message_payload)
[:iv] = encrypted[:iv]
@options[:content_encoding] = 'encrypted/cs'
return encrypted[:enciphered_message]
else
@options[:content_encoding] = 'identity'
end
message_payload
end
|
#encrypt? ⇒ Boolean
55
56
57
|
# File 'lib/legion/transport/message.rb', line 55
def encrypt?
Legion::Settings[:transport][:messages][:encrypt] && Legion::Settings[:crypt][:cs_encrypt_ready]
end
|
#encrypt_message(message, _type = 'cs') ⇒ Object
51
52
53
|
# File 'lib/legion/transport/message.rb', line 51
def encrypt_message(message, _type = 'cs')
Legion::Crypt.encrypt(message)
end
|
#exchange ⇒ Object
64
65
66
|
# File 'lib/legion/transport/message.rb', line 64
def exchange
Kernel.const_get(exchange_name)
end
|
#exchange_name ⇒ Object
59
60
61
62
|
# File 'lib/legion/transport/message.rb', line 59
def exchange_name
lex = self.class.ancestors.first.to_s.split('::')[2].downcase
"Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}"
end
|
#expiration ⇒ Object
85
86
87
|
# File 'lib/legion/transport/message.rb', line 85
def expiration
nil
end
|
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/legion/transport/message.rb', line 68
def
@options[:headers] ||= {}
%i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do || next unless @options.key?
@options[:headers][] = @options[].to_s
end
@options[:headers]
rescue StandardError => e
Legion::Logging.error e.message
Legion::Logging.error e.backtrace
end
|
#message ⇒ Object
27
28
29
|
# File 'lib/legion/transport/message.rb', line 27
def message
@options
end
|
#priority ⇒ Object
81
82
83
|
# File 'lib/legion/transport/message.rb', line 81
def priority
0
end
|
#publish(options = @options) ⇒ Object
rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/legion/transport/message.rb', line 11
def publish(options = @options) raise unless @valid
$exchanges = {} if $exchanges.nil?
$exchanges[exchange.to_s] = exchange.new unless $exchanges.key?(exchange.to_s)
exchange_dest = $exchanges[exchange.to_s]
exchange_dest.publish(encode_message,
routing_key: routing_key || '',
content_type: options[:content_type] || content_type,
content_encoding: options[:content_encoding] || content_encoding,
type: options[:type] || type,
priority: options[:priority] || priority,
expiration: options[:expiration] || expiration,
headers: )
end
|
#routing_key ⇒ Object
31
32
33
|
# File 'lib/legion/transport/message.rb', line 31
def routing_key
nil
end
|
#timestamp ⇒ Object
101
102
103
|
# File 'lib/legion/transport/message.rb', line 101
def timestamp
now.to_i
end
|
#type ⇒ Object
97
98
99
|
# File 'lib/legion/transport/message.rb', line 97
def type
'task'
end
|
#validate ⇒ Object
105
106
107
|
# File 'lib/legion/transport/message.rb', line 105
def validate
@valid = true
end
|