Class: Legion::Transport::Message

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/legion/transport/message.rb

Instance Method Summary collapse

Methods included from Common

#channel, #channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder

Constructor Details

#initialize(**options) ⇒ Message

Returns a new instance of Message.



6
7
8
9
# File 'lib/legion/transport/message.rb', line 6

def initialize(**options)
  @options = options
  validate
end

Instance Method Details

#content_encodingObject



93
94
95
# File 'lib/legion/transport/message.rb', line 93

def content_encoding
  'identity'
end

#content_typeObject



89
90
91
# File 'lib/legion/transport/message.rb', line 89

def content_type
  'application/json'
end

#encode_messageObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/legion/transport/message.rb', line 35

def encode_message
  message_payload = message
  message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String

  if encrypt?
    encrypted = Legion::Crypt.encrypt(message_payload)
    headers[:iv] = encrypted[:iv]
    @options[:content_encoding] = 'encrypted/cs'
    return encrypted[:enciphered_message]
  else
    @options[:content_encoding] = 'identity'
  end

  message_payload
end

#encrypt?Boolean

Returns:

  • (Boolean)


55
56
57
# File 'lib/legion/transport/message.rb', line 55

def encrypt?
  Legion::Settings[:transport][:messages][:encrypt] && Legion::Settings[:crypt][:cs_encrypt_ready]
end

#encrypt_message(message, _type = 'cs') ⇒ Object



51
52
53
# File 'lib/legion/transport/message.rb', line 51

def encrypt_message(message, _type = 'cs')
  Legion::Crypt.encrypt(message)
end

#exchangeObject



64
65
66
# File 'lib/legion/transport/message.rb', line 64

def exchange
  Kernel.const_get(exchange_name)
end

#exchange_nameObject



59
60
61
62
# File 'lib/legion/transport/message.rb', line 59

def exchange_name
  lex = self.class.ancestors.first.to_s.split('::')[2].downcase
  "Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}"
end

#expirationObject



85
86
87
# File 'lib/legion/transport/message.rb', line 85

def expiration
  nil
end

#headersObject



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/legion/transport/message.rb', line 68

def headers
  @options[:headers] ||= {}
  %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do |header| # rubocop:disable Layout/LineLength
    next unless @options.key? header

    @options[:headers][header] = @options[header].to_s
  end
  @options[:headers]
rescue StandardError => e
  Legion::Logging.error e.message
  Legion::Logging.error e.backtrace
end

#messageObject



27
28
29
# File 'lib/legion/transport/message.rb', line 27

def message
  @options
end

#priorityObject



81
82
83
# File 'lib/legion/transport/message.rb', line 81

def priority
  0
end

#publish(options = @options) ⇒ Object

rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/legion/transport/message.rb', line 11

def publish(options = @options) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
  raise unless @valid

  $exchanges = {} if $exchanges.nil?
  $exchanges[exchange.to_s] = exchange.new unless $exchanges.key?(exchange.to_s)
  exchange_dest = $exchanges[exchange.to_s]
  exchange_dest.publish(encode_message,
                        routing_key: routing_key || '',
                        content_type: options[:content_type] || content_type,
                        content_encoding: options[:content_encoding] || content_encoding,
                        type: options[:type] || type,
                        priority: options[:priority] || priority,
                        expiration: options[:expiration] || expiration,
                        headers: headers)
end

#routing_keyObject



31
32
33
# File 'lib/legion/transport/message.rb', line 31

def routing_key
  nil
end

#timestampObject



101
102
103
# File 'lib/legion/transport/message.rb', line 101

def timestamp
  now.to_i
end

#typeObject



97
98
99
# File 'lib/legion/transport/message.rb', line 97

def type
  'task'
end

#validateObject



105
106
107
# File 'lib/legion/transport/message.rb', line 105

def validate
  @valid = true
end