Class: MCollective::Connector::Nats
- Defined in:
- lib/mcollective/connector/nats.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
-
#active_options ⇒ Hash
Connection options from the NATS gem.
- #choria ⇒ Object
-
#client_flavour ⇒ String
Client library flavour.
-
#client_version ⇒ String
Client library version.
- #configure_ngs(parameters) ⇒ Object
- #configure_tls(parameters) ⇒ Object
-
#connect ⇒ void
Attempts to connect to the middleware, noop when already connected.
-
#connected? ⇒ Boolean
Determines if the NATS connection is active.
-
#connected_server ⇒ String?
Current connected server.
-
#current_pid ⇒ Fixnum
Retrieves the current process pid.
-
#decorate_servers_with_users(servers) ⇒ Array<URI>
Add user and pass to a series of URIs.
-
#disconnect ⇒ Object
Disconnects from NATS.
-
#environment ⇒ Object
Retrieves the environment, mainly used for testing.
-
#get_option(opt, default = :_unset) ⇒ Object
Gets a config option.
-
#headers_for(msg) ⇒ Hash
Creates the middleware headers needed for a given message.
-
#initialize ⇒ Nats
constructor
rubocop:disable Lint/MissingSuper.
-
#make_target(agent, type, collective, identity = nil) ⇒ String
Creates a target structure.
-
#publish(msg) ⇒ Object
Publishes a message to the middleware.
-
#publish_connected_broadcast(msg) ⇒ Object
Publish a broadcast message to a connected collective.
-
#publish_connected_directed(msg) ⇒ Object
Publish a directed request to a connected collective.
-
#publish_federated_broadcast(msg) ⇒ Object
Publish a broadcast message to via a Federation Broker.
-
#publish_federated_directed(msg) ⇒ Object
Publish a directed request via a Federation Broker.
-
#receive ⇒ Message
Receives a message from the middleware.
-
#server_list ⇒ Array<String>
Retrieves the list of server and port combos to attempt to connect to.
-
#stats ⇒ Hash
Retrieves the NATS connection stats.
-
#subscribe(agent, type, collective) ⇒ void
Subscribes to the topics/queues needed for a particular agent.
-
#target_for(msg, identity = nil) ⇒ Hash
Create a target structure for a message.
-
#unsubscribe(agent, type, collective) ⇒ void
Unsubscribe from the target for a agent.
Methods inherited from Base
Constructor Details
#initialize ⇒ Nats
rubocop:disable Lint/MissingSuper
10 11 12 13 14 15 16 |
# File 'lib/mcollective/connector/nats.rb', line 10 def initialize # rubocop:disable Lint/MissingSuper @config = Config.instance @subscriptions = [] @connection = Util::NatsWrapper.new Log.info("Choria NATS.io connector using pure ruby nats/io/client %s with protocol version %s" % [NATS::IO::VERSION, NATS::IO::PROTOCOL]) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
8 9 10 |
# File 'lib/mcollective/connector/nats.rb', line 8 def connection @connection end |
Instance Method Details
#active_options ⇒ Hash
Connection options from the NATS gem
56 57 58 |
# File 'lib/mcollective/connector/nats.rb', line 56 def connection. end |
#choria ⇒ Object
445 446 447 |
# File 'lib/mcollective/connector/nats.rb', line 445 def choria @_choria ||= Util::Choria.new(false) end |
#client_flavour ⇒ String
Client library flavour
49 50 51 |
# File 'lib/mcollective/connector/nats.rb', line 49 def client_flavour connection.client_flavour end |
#client_version ⇒ String
Client library version
42 43 44 |
# File 'lib/mcollective/connector/nats.rb', line 42 def client_version connection.client_version end |
#configure_ngs(parameters) ⇒ Object
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/mcollective/connector/nats.rb', line 104 def configure_ngs(parameters) Log.debug("Disabling specific TLS during connection to NGS") raise("nkeys rubygem is required for connections with credentials") unless choria.nkeys? tls = OpenSSL::SSL::SSLContext.new tls.ssl_version = :TLSv1_2 # rubocop:disable Naming/VariableNumber parameters[:tls] = {:context => tls} end |
#configure_tls(parameters) ⇒ Object
99 100 101 102 |
# File 'lib/mcollective/connector/nats.rb', line 99 def configure_tls(parameters) parameters[:tls] = {:context => choria.ssl_context} choria.check_ssl_setup end |
#connect ⇒ void
This method returns an undefined value.
Attempts to connect to the middleware, noop when already connected
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/mcollective/connector/nats.rb', line 64 def connect if connection && connection.started? Log.debug("Already connection, not re-initializing connection") return end parameters = { :max_reconnect_attempts => -1, :reconnect_time_wait => 1, :dont_randomize_servers => !choria.randomize_middleware_servers?, :name => @config.identity } parameters[:user_credentials] = choria.credential_file if choria.credential_file? if $choria_unsafe_disable_nats_tls # rubocop:disable Style/GlobalVars Log.warn("Disabling TLS in NATS connector, this is not a production supported setup") elsif choria.ngs? configure_ngs(parameters) else configure_tls(parameters) end servers = server_list unless servers.empty? Log.debug("Connecting to servers: %s" % servers.join(", ")) parameters[:servers] = servers end connection.start(parameters) nil end |
#connected? ⇒ Boolean
Determines if the NATS connection is active
21 22 23 |
# File 'lib/mcollective/connector/nats.rb', line 21 def connected? connection.connected? end |
#connected_server ⇒ String?
Current connected server
28 29 30 |
# File 'lib/mcollective/connector/nats.rb', line 28 def connected_server connection.connected_server end |
#current_pid ⇒ Fixnum
mainly used for testing
Retrieves the current process pid
192 193 194 |
# File 'lib/mcollective/connector/nats.rb', line 192 def current_pid $$ end |
#decorate_servers_with_users(servers) ⇒ Array<URI>
Add user and pass to a series of URIs
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# File 'lib/mcollective/connector/nats.rb', line 408 def decorate_servers_with_users(servers) user = get_option("nats.user", environment["MCOLLECTIVE_NATS_USERNAME"]) pass = get_option("nats.pass", environment["MCOLLECTIVE_NATS_PASSWORD"]) if choria.anon_tls? user = PluginManager["security_plugin"].request_signer.token pass = nil end if user || pass servers.each do |uri| uri.user = user uri.password = pass end end servers end |
#disconnect ⇒ Object
Disconnects from NATS
116 117 118 |
# File 'lib/mcollective/connector/nats.rb', line 116 def disconnect connection.stop end |
#environment ⇒ Object
Retrieves the environment, mainly used for testing
428 429 430 |
# File 'lib/mcollective/connector/nats.rb', line 428 def environment ENV end |
#get_option(opt, default = :_unset) ⇒ Object
Gets a config option
438 439 440 441 442 443 |
# File 'lib/mcollective/connector/nats.rb', line 438 def get_option(opt, default=:_unset) return @config.pluginconf[opt] if @config.pluginconf.include?(opt) return default unless default == :_unset raise("No plugin.%s configuration option given" % opt) end |
#headers_for(msg) ⇒ Hash
Creates the middleware headers needed for a given message
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/mcollective/connector/nats.rb', line 124 def headers_for(msg) # mc_sender is only passed bacause M::Message incorrectly assumed this is some required # part of messages when its just some internals of the stomp based connectors that bled out headers = { "mc_sender" => @config.identity } headers["seen-by"] = [] if msg.headers.include?("seen-by") if [:request, :direct_request].include?(msg.type) if msg.reply_to headers["reply-to"] = msg.reply_to else # if its a request/direct_request style message and its not # one we're replying to - ie. its a new message we're making # we'll need to set a reply-to target that the daemon will # subscribe to headers["reply-to"] = make_target(msg.agent, :reply, msg.collective) end headers["seen-by"] << [@config.identity, connected_server.to_s] if msg.headers.include?("seen-by") elsif msg.type == :reply if msg.request.headers.include?("seen-by") headers["seen-by"] = msg.request.headers["seen-by"] headers["seen-by"].last << connected_server.to_s end end headers end |
#make_target(agent, type, collective, identity = nil) ⇒ String
Creates a target structure
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/mcollective/connector/nats.rb', line 204 def make_target(agent, type, collective, identity=nil) raise("Unknown target type %s" % type) unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type) raise("Unknown collective '%s' known collectives are '%s'" % [collective, @config.collectives.join(", ")]) unless @config.collectives.include?(collective) identity ||= @config.identity case type when :reply "%s.reply.%s.%d.%d" % [collective, Digest::MD5.hexdigest(choria.callerid), current_pid, Client.request_sequence] when :broadcast, :request "%s.broadcast.agent.%s" % [collective, agent] when :direct_request, :directed "%s.node.%s" % [collective, identity] end end |
#publish(msg) ⇒ Object
Publishes a message to the middleware
226 227 228 229 230 231 232 233 234 |
# File 'lib/mcollective/connector/nats.rb', line 226 def publish(msg) msg.base64_encode! if choria.federated? msg.type == :direct_request ? publish_federated_directed(msg) : publish_federated_broadcast(msg) else msg.type == :direct_request ? publish_connected_directed(msg) : publish_connected_broadcast(msg) end end |
#publish_connected_broadcast(msg) ⇒ Object
Publish a broadcast message to a connected collective
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/mcollective/connector/nats.rb', line 321 def publish_connected_broadcast(msg) target = target_for(msg) data = { "protocol" => "choria:transport:1", "data" => msg.payload, "headers" => target[:headers] } # only happens when replying = msg.request data["headers"]["federation"] = .headers["federation"] if && .headers.include?("federation") Log.debug("Sending a broadcast message to NATS target '%s' for message type %s" % [target.inspect, msg.type]) connection.publish(target[:name], JSON.dump(data), target[:headers]["reply-to"]) end |
#publish_connected_directed(msg) ⇒ Object
Publish a directed request to a connected collective
276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/mcollective/connector/nats.rb', line 276 def publish_connected_directed(msg) msg.discovered_hosts.each do |node| target = target_for(msg, node) data = { "protocol" => "choria:transport:1", "data" => msg.payload, "headers" => target[:headers] } Log.debug("Sending a direct message to %s via NATS target '%s' for message type %s" % [node, target.inspect, msg.type]) connection.publish(target[:name], data.to_json, target[:headers]["reply-to"]) end end |
#publish_federated_broadcast(msg) ⇒ Object
Publish a broadcast message to via a Federation Broker
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/mcollective/connector/nats.rb', line 294 def publish_federated_broadcast(msg) target = target_for(msg) data = { "protocol" => "choria:transport:1", "data" => msg.payload, "headers" => { "federation" => { "target" => [target[:name]], "req" => msg.requestid } }.merge(target[:headers]) } data = JSON.dump(data) choria.federation_collectives.each do |network| target[:name] = "choria.federation.%s.federation" % network Log.debug("Sending a federated broadcast message to NATS target '%s' for message type %s" % [target.inspect, msg.type]) connection.publish(target[:name], data, target[:headers]["reply-to"]) end end |
#publish_federated_directed(msg) ⇒ Object
Publish a directed request via a Federation Broker
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/mcollective/connector/nats.rb', line 239 def publish_federated_directed(msg) = [] target = target_for(msg, msg.discovered_hosts[0]) msg.discovered_hosts.in_groups_of(200) do |nodes| node_targets = nodes.compact.map do |node| target_for(msg, node)[:name] end data = { "protocol" => "choria:transport:1", "data" => msg.payload, "headers" => { "federation" => { "target" => node_targets, "req" => msg.requestid } }.merge(target[:headers]) } << JSON.dump(data) end choria.federation_collectives.each do |network| .each do |data| network_target = "choria.federation.%s.federation" % network Log.debug("Sending a federated direct message via NATS target '%s' for message type %s" % [network_target, msg.type]) connection.publish(network_target, data, target[:headers]["reply-to"]) end end end |
#receive ⇒ Message
blocks until one is received
Receives a message from the middleware
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 |
# File 'lib/mcollective/connector/nats.rb', line 369 def receive msg = nil until msg received = connection.receive Thread.pass begin msg = JSON.parse(received) rescue Log.warn("Got non JSON data from the broker: %s" % [received]) msg = nil end end msg["headers"]["seen-by"] << [connected_server.to_s, @config.identity] if msg["headers"].include?("seen-by") Message.new(msg["data"], msg, :base64 => true, :headers => msg["headers"]) end |
#server_list ⇒ Array<String>
Retrieves the list of server and port combos to attempt to connect to
Configured servers are checked, then SRV records and finally a fall back to puppet:4222 is done
396 397 398 399 400 401 402 |
# File 'lib/mcollective/connector/nats.rb', line 396 def server_list uris = choria.middleware_servers("puppet", "4222").map do |host, port| URI("nats://%s:%s" % [host, port]) end decorate_servers_with_users(uris).map(&:to_s) end |
#stats ⇒ Hash
Retrieves the NATS connection stats
35 36 37 |
# File 'lib/mcollective/connector/nats.rb', line 35 def stats connection.stats end |
#subscribe(agent, type, collective) ⇒ void
This method returns an undefined value.
Subscribes to the topics/queues needed for a particular agent
359 360 361 362 363 |
# File 'lib/mcollective/connector/nats.rb', line 359 def subscribe(agent, type, collective) target = make_target(agent, type, collective) connection.subscribe(target) end |
#target_for(msg, identity = nil) ⇒ Hash
Create a target structure for a message
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/mcollective/connector/nats.rb', line 167 def target_for(msg, identity=nil) target = nil if msg.type == :reply raise("Do not know how to reply, no reply-to header has been set on message %s" % msg.requestid) unless msg.request.headers["reply-to"] target = {:name => msg.request.headers["reply-to"], :headers => {}} elsif [:request, :direct_request].include?(msg.type) target = {:name => make_target(msg.agent, msg.type, msg.collective, identity), :headers => {}} else raise("Don't now how to create a target for message type %s" % msg.type) end target[:headers].merge!(headers_for(msg)) target end |
#unsubscribe(agent, type, collective) ⇒ void
This method returns an undefined value.
Unsubscribe from the target for a agent
345 346 347 348 349 350 |
# File 'lib/mcollective/connector/nats.rb', line 345 def unsubscribe(agent, type, collective) target = make_target(agent, type, collective) Log.debug("Unsubscribing from %s" % target) connection.unsubscribe(target) end |