Class: MCollective::Util::NatsWrapper
- Inherits:
-
Object
- Object
- MCollective::Util::NatsWrapper
- Defined in:
- lib/mcollective/util/natswrapper.rb
Overview
A wrapper class around the Pure Ruby NATS gem
MCollective has some non compatible expectations about how message flow works such as having a blocking receive and publish method it calls when it likes, while typical flow is to pass a block and then callbacks will be called.
This wrapper bridges the 2 worlds using ruby Queues to simulate the blocking receive expectation MCollective has thanks to its initial design around the Stomp gem.
Instance Attribute Summary collapse
-
#received_queue ⇒ Object
readonly
Returns the value of attribute received_queue.
-
#subscriptions ⇒ Object
readonly
Returns the value of attribute subscriptions.
Instance Method Summary collapse
-
#active_options ⇒ Hash
Connection options from the NATS gem.
-
#backoff_sleep ⇒ void
Does a backoff sleep up to 2 seconds.
-
#client_flavour ⇒ String
Client library flavour.
-
#client_version ⇒ String
Client library version.
-
#connected? ⇒ Boolean
Is NATS connected.
-
#connected_server ⇒ String?
Retrieves the current connected server.
-
#has_client? ⇒ Boolean
Is there a NATS client created.
-
#initialize ⇒ NatsWrapper
constructor
A new instance of NatsWrapper.
-
#log_nats_pool ⇒ void
Logs the NATS server pool for nats-pure.
-
#publish(destination, payload, reply = nil) ⇒ Object
Public a message.
-
#receive ⇒ String
Receives a message from the receive queue.
-
#start(options = {}) ⇒ Object
Starts the EM based NATS connection.
-
#started? ⇒ Boolean
Has the NATS connection started.
-
#stats ⇒ Hash
Connection stats from the NATS gem.
-
#stop ⇒ Object
Stops the NATS connection.
-
#stub_client(client) ⇒ Object
Test helper.
-
#subscribe(source_name, options = {}) ⇒ Object
Subscribes to a message source.
-
#unsubscribe(source_name) ⇒ Object
Unsubscribes from a message source.
Constructor Details
#initialize ⇒ NatsWrapper
Returns a new instance of NatsWrapper.
18 19 20 21 22 23 24 |
# File 'lib/mcollective/util/natswrapper.rb', line 18 def initialize @received_queue = Queue.new @subscriptions = {} @subscription_mutex = Mutex.new @started = false @client = NATS::IO::Client.new end |
Instance Attribute Details
#received_queue ⇒ Object (readonly)
Returns the value of attribute received_queue.
16 17 18 |
# File 'lib/mcollective/util/natswrapper.rb', line 16 def received_queue @received_queue end |
#subscriptions ⇒ Object (readonly)
Returns the value of attribute subscriptions.
16 17 18 |
# File 'lib/mcollective/util/natswrapper.rb', line 16 def subscriptions @subscriptions end |
Instance Method Details
#active_options ⇒ Hash
Connection options from the NATS gem
75 76 77 78 79 |
# File 'lib/mcollective/util/natswrapper.rb', line 75 def return {} unless has_client? @client. end |
#backoff_sleep ⇒ void
This method returns an undefined value.
Does a backoff sleep up to 2 seconds
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/mcollective/util/natswrapper.rb', line 91 def backoff_sleep @backoffcount ||= 1 if @backoffcount >= 50 sleep(2) else sleep(0.04 * @backoffcount) end @backoffcount += 1 end |
#client_flavour ⇒ String
Client library flavour
61 62 63 |
# File 'lib/mcollective/util/natswrapper.rb', line 61 def client_flavour "nats-pure" end |
#client_version ⇒ String
Client library version
68 69 70 |
# File 'lib/mcollective/util/natswrapper.rb', line 68 def client_version NATS::IO::VERSION end |
#connected? ⇒ Boolean
Is NATS connected
84 85 86 |
# File 'lib/mcollective/util/natswrapper.rb', line 84 def connected? has_client? && @client.connected? end |
#connected_server ⇒ String?
Retrieves the current connected server
43 44 45 46 47 |
# File 'lib/mcollective/util/natswrapper.rb', line 43 def connected_server return nil unless connected? @client.connected_server end |
#has_client? ⇒ Boolean
Is there a NATS client created
36 37 38 |
# File 'lib/mcollective/util/natswrapper.rb', line 36 def has_client? !!@client end |
#log_nats_pool ⇒ void
This method returns an undefined value.
Logs the NATS server pool for nats-pure
The current server pool is dynamic as the NATS servers can announce new cluster members as they join the pool, little helper for logging the pool on major events
110 111 112 113 114 115 116 117 118 |
# File 'lib/mcollective/util/natswrapper.rb', line 110 def log_nats_pool return unless has_client? servers = @client.server_pool.map do |server| server[:uri].to_s end Log.info("Current server pool: %s" % servers.join(", ")) end |
#publish(destination, payload, reply = nil) ⇒ Object
Public a message
194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/mcollective/util/natswrapper.rb', line 194 def publish(destination, payload, reply=nil) server_state = "%s %s" % [connected? ? "connected" : "disconnected", @client.connected_server] if reply Log.debug("Publishing to %s reply to %s via %s" % [destination, reply, server_state]) else Log.debug("Publishing to %s via %s" % [destination, server_state]) end @client.publish(destination, payload, reply) end |
#receive ⇒ String
Receives a message from the receive queue
This will block until a message is available
185 186 187 |
# File 'lib/mcollective/util/natswrapper.rb', line 185 def receive @received_queue.pop end |
#start(options = {}) ⇒ Object
Starts the EM based NATS connection
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/mcollective/util/natswrapper.rb', line 123 def start(={}) # Client connects pretty much soon as it's initialized which is very early # and some applications like 'request_cert' just doesnt need/want a client # since for example there won't be SSL stuff yet, so if a application calls # disconnect very early on this should avoid that chicken and egg return if @force_Stop @client.on_reconnect do Log.warn("Reconnected after connection failure: %s" % connected_server) log_nats_pool @backoffcount = 1 end @client.on_disconnect do |error| if error Log.warn("Disconnected from NATS: %s: %s" % [error.class, error.to_s]) else Log.info("Disconnected from NATS for an unknown reason") end end @client.on_error do |error| Log.error("Error in NATS connection: %s: %s" % [error.class, error.to_s]) end @client.on_close do Log.info("Connection to NATS server closed") end begin @client.connect() rescue ClientTimeoutError raise rescue Log.error("Error during initial NATS setup: %s: %s" % [$!.class, $!.]) Log.debug($!.backtrace.join("\n\t")) sleep 1 Log.error("Retrying NATS initial setup") retry end sleep(0.01) until connected? @started = true nil end |
#started? ⇒ Boolean
Has the NATS connection started
29 30 31 |
# File 'lib/mcollective/util/natswrapper.rb', line 29 def started? @started end |
#stats ⇒ Hash
Connection stats from the NATS gem
52 53 54 55 56 |
# File 'lib/mcollective/util/natswrapper.rb', line 52 def stats return {} unless has_client? @client.stats end |
#stop ⇒ Object
Stops the NATS connection
175 176 177 178 |
# File 'lib/mcollective/util/natswrapper.rb', line 175 def stop @force_stop = true @client.close end |
#stub_client(client) ⇒ Object
Test helper
237 238 239 |
# File 'lib/mcollective/util/natswrapper.rb', line 237 def stub_client(client) @client = client end |
#subscribe(source_name, options = {}) ⇒ Object
Subscribes to a message source
210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/mcollective/util/natswrapper.rb', line 210 def subscribe(source_name, ={}) @subscription_mutex.synchronize do Log.debug("Subscribing to %s" % source_name) unless @subscriptions.include?(source_name) @subscriptions[source_name] = @client.subscribe(source_name, ) do |msg, _, _| @received_queue << msg end end end end |
#unsubscribe(source_name) ⇒ Object
Unsubscribes from a message source
225 226 227 228 229 230 231 232 |
# File 'lib/mcollective/util/natswrapper.rb', line 225 def unsubscribe(source_name) @subscription_mutex.synchronize do if @subscriptions.include?(source_name) @client.unsubscribe(@subscriptions[source_name]) @subscriptions.delete(source_name) end end end |