Class: MCollective::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/message.rb

Overview

container for a message, its headers, agent, collective and other meta data

Constant Summary collapse

VALIDTYPES =
[:message, :request, :direct_request, :reply].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload, message, options = {}) ⇒ Message

payload - the message body without headers etc, just the text message - the original message received from the middleware options - if the body base64 encoded? options - the agent the message is for/from options - the collective its for/from options - the message headers options - an indicator about the type of message, :message, :request, :direct_request or :reply options - if this is a reply this should old the message we are replying to options - for requests, the filter to encode into the message options - the normal client options hash options - the maximum amount of seconds this message can be valid for options - in the case of replies this is the msgid it is expecting in the replies options - specific request id to use else one will be generated



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/mcollective/message.rb', line 22

def initialize(payload, message, options={})
  options = {:base64 => false,
             :agent => nil,
             :headers => {},
             :type => :message,
             :request => nil,
             :filter => Util.empty_filter,
             :options => {},
             :ttl => 60,
             :expected_msgid => nil,
             :requestid => nil,
             :collective => nil}.merge(options)

  @payload = payload
  @message = message
  @requestid = options[:requestid]
  @discovered_hosts = nil
  @reply_to = nil

  @type = options[:type]
  @headers = options[:headers]
  @base64 = options[:base64]
  @filter = options[:filter]
  @expected_msgid = options[:expected_msgid]
  @options = options[:options]

  @ttl = @options[:ttl] || Config.instance.ttl
  @msgtime = 0

  @validated = false

  if options[:request]
    @request = options[:request]
    @agent = request.agent
    @collective = request.collective
    @type = :reply
  else
    @agent = options[:agent]
    @collective = options[:collective]
  end

  base64_decode!
end

Instance Attribute Details

#agentObject

Returns the value of attribute agent.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def agent
  @agent
end

#collectiveObject

Returns the value of attribute collective.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def collective
  @collective
end

#discovered_hostsObject

Returns the value of attribute discovered_hosts.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def discovered_hosts
  @discovered_hosts
end

#expected_msgidObject

Returns the value of attribute expected_msgid.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def expected_msgid
  @expected_msgid
end

#filterObject

Returns the value of attribute filter.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def filter
  @filter
end

#headersObject

Returns the value of attribute headers.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def headers
  @headers
end

#messageObject (readonly)

Returns the value of attribute message.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def message
  @message
end

#msgtimeObject (readonly)

Returns the value of attribute msgtime.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def msgtime
  @msgtime
end

#optionsObject

Returns the value of attribute options.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def options
  @options
end

#payloadObject (readonly)

Returns the value of attribute payload.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def payload
  @payload
end

#reply_toObject

Returns the value of attribute reply_to.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def reply_to
  @reply_to
end

#requestObject (readonly)

Returns the value of attribute request.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def request
  @request
end

#requestidObject

Returns the value of attribute requestid.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def requestid
  @requestid
end

#ttlObject

Returns the value of attribute ttl.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def ttl
  @ttl
end

#typeObject

Returns the value of attribute type.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def type
  @type
end

#validatedObject (readonly)

Returns the value of attribute validated.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def validated
  @validated
end

Instance Method Details

#base64?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'lib/mcollective/message.rb', line 127

def base64?
  @base64
end

#base64_decode!Object



113
114
115
116
117
118
# File 'lib/mcollective/message.rb', line 113

def base64_decode!
  return unless @base64

  @payload = SSL.base64_decode(@payload)
  @base64 = false
end

#base64_encode!Object



120
121
122
123
124
125
# File 'lib/mcollective/message.rb', line 120

def base64_encode!
  return if @base64

  @payload = SSL.base64_encode(@payload)
  @base64 = true
end

#create_reqidObject



212
213
214
215
216
217
# File 'lib/mcollective/message.rb', line 212

def create_reqid
  # we gsub out the -s so that the format of the id does not
  # change from previous versions, these should just be more
  # unique than previous ones
  SSL.uuid.gsub("-", "")
end

#decode!Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/mcollective/message.rb', line 158

def decode!
  raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)

  begin
    @payload = PluginManager["security_plugin"].decodemsg(self)
  rescue Exception => e # rubocop:disable Lint/RescueException
    if type == :request
      # If we're a server receiving a request, reraise
      raise(e)
    else
      # We're in the client, log and carry on as best we can

      # NOTE: mc_sender is unverified.  The verified identity is in the
      # payload we just failed to decode
      Log.warn("Failed to decode a message from '#{headers['mc_sender']}': #{e}")
      return
    end
  end

  if type == :request && !PluginManager["security_plugin"].valid_callerid?(payload[:callerid])
    raise "callerid in request is not valid, surpressing reply to potentially forged request"
  end

  [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
    instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
  end
end

#descriptionObject



131
132
133
134
135
136
137
# File 'lib/mcollective/message.rb', line 131

def description
  cid = ""
  cid += "#{payload[:callerid]}@" if payload.include?(:callerid)
  cid += payload[:senderid]

  "#{requestid} for agent '#{agent}' in collective '#{collective}' from #{cid}"
end

#encode!Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/mcollective/message.rb', line 139

def encode!
  case type
  when :reply
    raise "Cannot encode a reply message if no request has been associated with it" unless request

    unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid])
      raise "callerid in original request is not valid, surpressing reply to potentially forged request"
    end

    @requestid = request.payload[:requestid]
    @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
  when :request, :direct_request
    @requestid ||= create_reqid
    @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
  else
    raise "Cannot encode #{type} messages"
  end
end

#publishObject

publish a reply message by creating a target name and sending it



199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/mcollective/message.rb', line 199

def publish
  # If we've been specificaly told about hosts that were discovered
  # use that information to do P2P calls if appropriate else just
  # send it as is.
  config = Config.instance
  if @discovered_hosts && config.direct_addressing && (@discovered_hosts.size <= config.direct_addressing_threshold)
    self.type = :direct_request
    Log.debug("Handling #{requestid} as a direct request")
  end

  PluginManager["connector_plugin"].publish(self)
end

#validateObject

Perform validation against the message by checking filters and ttl

Raises:



187
188
189
190
191
192
193
194
195
196
# File 'lib/mcollective/message.rb', line 187

def validate
  raise "Can only validate request messages" unless type == :request

  msg_age = Time.now.utc.to_i - msgtime

  raise(MsgTTLExpired, "Message #{description} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}. Rejecting message.") if msg_age > ttl
  raise(NotTargettedAtUs, "Message #{description} does not pass filters. Ignoring message.") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])

  @validated = true
end