Class: MCollective::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/message.rb

Overview

container for a message, its headers, agent, collective and other meta data

Constant Summary collapse

VALIDTYPES =
[:message, :request, :direct_request, :reply]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload, message, options = {}) ⇒ Message

payload - the message body without headers etc, just the text message - the original message received from the middleware options - if the body base64 encoded? options - the agent the message is for/from options - the collective its for/from options - the message headers options - an indicator about the type of message, :message, :request, :direct_request or :reply options - if this is a reply this should old the message we are replying to options - for requests, the filter to encode into the message options - the normal client options hash options - the maximum amount of seconds this message can be valid for options - in the case of replies this is the msgid it is expecting in the replies options - specific request id to use else one will be generated



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/mcollective/message.rb', line 23

def initialize(payload, message, options = {})
  options = {:base64 => false,
             :agent => nil,
             :headers => {},
             :type => :message,
             :request => nil,
             :filter => Util.empty_filter,
             :options => {},
             :ttl => 60,
             :expected_msgid => nil,
             :requestid => nil,
             :collective => nil}.merge(options)

  @payload = payload
  @message = message
  @requestid = options[:requestid]
  @discovered_hosts = nil
  @reply_to = nil

  @type = options[:type]
  @headers = options[:headers]
  @base64 = options[:base64]
  @filter = options[:filter]
  @expected_msgid = options[:expected_msgid]
  @options = options[:options]

  @ttl = @options[:ttl] || Config.instance.ttl
  @msgtime = 0

  @validated = false

  if options[:request]
    @request = options[:request]
    @agent = request.agent
    @collective = request.collective
    @type = :reply
  else
    @agent = options[:agent]
    @collective = options[:collective]
  end

  base64_decode!
end

Instance Attribute Details

#agentObject

Returns the value of attribute agent.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def agent
  @agent
end

#collectiveObject

Returns the value of attribute collective.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def collective
  @collective
end

#discovered_hostsObject

Returns the value of attribute discovered_hosts.



6
7
8
# File 'lib/mcollective/message.rb', line 6

def discovered_hosts
  @discovered_hosts
end

#expected_msgidObject

Returns the value of attribute expected_msgid.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def expected_msgid
  @expected_msgid
end

#filterObject

Returns the value of attribute filter.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def filter
  @filter
end

#headersObject

Returns the value of attribute headers.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def headers
  @headers
end

#messageObject (readonly)

Returns the value of attribute message.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def message
  @message
end

#msgtimeObject (readonly)

Returns the value of attribute msgtime.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def msgtime
  @msgtime
end

#optionsObject

Returns the value of attribute options.



6
7
8
# File 'lib/mcollective/message.rb', line 6

def options
  @options
end

#payloadObject (readonly)

Returns the value of attribute payload.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def payload
  @payload
end

#reply_toObject

Returns the value of attribute reply_to.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def reply_to
  @reply_to
end

#requestObject (readonly)

Returns the value of attribute request.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def request
  @request
end

#requestidObject

Returns the value of attribute requestid.



6
7
8
# File 'lib/mcollective/message.rb', line 6

def requestid
  @requestid
end

#ttlObject

Returns the value of attribute ttl.



6
7
8
# File 'lib/mcollective/message.rb', line 6

def ttl
  @ttl
end

#typeObject

Returns the value of attribute type.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def type
  @type
end

#validatedObject (readonly)

Returns the value of attribute validated.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def validated
  @validated
end

Instance Method Details

#base64?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/mcollective/message.rb', line 129

def base64?
  @base64
end

#base64_decode!Object



115
116
117
118
119
120
# File 'lib/mcollective/message.rb', line 115

def base64_decode!
  return unless @base64

  @payload = SSL.base64_decode(@payload)
  @base64 = false
end

#base64_encode!Object



122
123
124
125
126
127
# File 'lib/mcollective/message.rb', line 122

def base64_encode!
  return if @base64

  @payload = SSL.base64_encode(@payload)
  @base64 = true
end

#create_reqidObject



226
227
228
229
230
231
# File 'lib/mcollective/message.rb', line 226

def create_reqid
  # we gsub out the -s so that the format of the id does not
  # change from previous versions, these should just be more
  # unique than previous ones
  SSL.uuid.gsub("-", "")
end

#decode!Object



175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/mcollective/message.rb', line 175

def decode!
  raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)

  @payload = PluginManager["security_plugin"].decodemsg(self)

  if type == :request
    raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid])
  end

  [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
    instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
  end
end

#encode!Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/mcollective/message.rb', line 133

def encode!
  case type
    when :reply
      raise "Cannot encode a reply message if no request has been associated with it" unless request
      raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid])

      @requestid = request.payload[:requestid]
      @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
    when :request, :direct_request
      validate_compound_filter(@filter["compound"]) unless @filter["compound"].empty?

      @requestid = create_reqid unless @requestid
      @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
    else
      raise "Cannot encode #{type} messages"
  end
end

#publishObject

publish a reply message by creating a target name and sending it



213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/mcollective/message.rb', line 213

def publish
  # If we've been specificaly told about hosts that were discovered
  # use that information to do P2P calls if appropriate else just
  # send it as is.
  config = Config.instance
  if @discovered_hosts && config.direct_addressing && (@discovered_hosts.size <= config.direct_addressing_threshold)
    self.type = :direct_request
    Log.debug("Handling #{requestid} as a direct request")
  end

  PluginManager['connector_plugin'].publish(self)
end

#validateObject

Perform validation against the message by checking filters and ttl

Raises:



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/mcollective/message.rb', line 190

def validate
  raise "Can only validate request messages" unless type == :request

  msg_age = Time.now.utc.to_i - msgtime

  if msg_age > ttl
    cid = ""
    cid += payload[:callerid] + "@" if payload.include?(:callerid)
    cid += payload[:senderid]

    if msg_age > ttl
      PluginManager["global_stats"].ttlexpired

      raise(MsgTTLExpired, "message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}")
    end
  end

  raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])

  @validated = true
end

#validate_compound_filter(compound_filter) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/mcollective/message.rb', line 151

def validate_compound_filter(compound_filter)
  compound_filter.each do |filter|
    filter.each do |statement|
      if statement["fstatement"]
        functionname = statement["fstatement"]["name"]
        pluginname = Data.pluginname(functionname)
        value = statement["fstatement"]["value"]

        ddl = DDL.new(pluginname, :data)

        # parses numbers and booleans entered as strings into proper
        # types of data so that DDL validation will pass
        statement["fstatement"]["params"] = Data.ddl_transform_input(ddl, statement["fstatement"]["params"])

        Data.ddl_validate(ddl, statement["fstatement"]["params"])

        unless value && Data.ddl_has_output?(ddl, value)
          DDL.validation_fail!(:PLMC41, "Data plugin '%{functionname}()' does not return a '%{value}' value", :error, {:functionname => functionname, :value => value})
        end
      end
    end
  end
end