Class: NatsStreamingListener::StreamingClient
- Inherits:
-
NatsListenerCore::AbstractClient
- Object
- NatsListenerCore::AbstractClient
- NatsStreamingListener::StreamingClient
- Defined in:
- lib/nats_streaming_listener/streaming_client.rb
Overview
Implementation of Nats-streaming client
Instance Attribute Summary collapse
-
#:catch_errors - used to catch errors around subscribers/connections(be careful with it!)(: catch_errors-usedtocatcherrorsaroundsubscribers/connections(be careful with it!)) ⇒ Object
Use this opts:.
-
#:catch_provider - this class will be called with catch_provider.error(e)(: catch_provider-this) ⇒ Object
Use this opts:.
-
#:client_id - current service client id(optional)(: client_id-currentserviceclientid(optional)) ⇒ Object
Use this opts for connection:.
-
#:cluster_name - name of nats-streaming cluster that we connect to(: cluster_name-nameofnats-streamingclusterthatweconnectto) ⇒ Object
Use this opts for connection:.
-
#:disable_nats - if something is passed to that attribute - nats won't be initialized(: disable_nats-) ⇒ Object
Use this opts:.
-
#:logger - logger used in this service(: logger-loggerused) ⇒ Object
Use this opts:.
-
#:nats - nats connection info(example: ```{servers: 'nats://127.0.0.1:4223'}```)(: nats-natsconnectioninfo(example: ```{servers: 'nats://127.0.0.1:4223'}```)) ⇒ Object
Use this opts for connection:.
-
#:service_name - name of current service(: service_name-nameofcurrentservice) ⇒ Object
Use this opts for connection:.
-
#:skip - flag attribute used to skip connections(useful for testing)(: skip-flagattributeusedtoskipconnections(useful) ⇒ Object
Use this opts:.
Instance Method Summary collapse
- #client_name ⇒ Object
- #cluster_name ⇒ Object
- #disconnected? ⇒ Boolean
-
#establish_connection(config = {}) ⇒ Object
Use this opts for connection:.
-
#initialize(opts = {}) ⇒ StreamingClient
constructor
Use this opts:.
- #reestablish_connection ⇒ Object
- #request(subject, message, opts = {}) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ StreamingClient
Use this opts:
16 17 18 19 20 21 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16 def initialize(opts = {}) @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client @logger = NatsListenerCore::ClientLogger.new(opts) @skip = opts[:skip] || false @client_catcher = NatsListenerCore::ClientCatcher.new(opts) end |
Instance Attribute Details
#:catch_errors - used to catch errors around subscribers/connections(be careful with it!)(: catch_errors-usedtocatcherrorsaroundsubscribers/connections(be careful with it!)) ⇒ Object
Use this opts:
16 17 18 19 20 21 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16 def initialize(opts = {}) @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client @logger = NatsListenerCore::ClientLogger.new(opts) @skip = opts[:skip] || false @client_catcher = NatsListenerCore::ClientCatcher.new(opts) end |
#:catch_provider - this class will be called with catch_provider.error(e)(: catch_provider-this) ⇒ Object
Use this opts:
16 17 18 19 20 21 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16 def initialize(opts = {}) @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client @logger = NatsListenerCore::ClientLogger.new(opts) @skip = opts[:skip] || false @client_catcher = NatsListenerCore::ClientCatcher.new(opts) end |
#:client_id - current service client id(optional)(: client_id-currentserviceclientid(optional)) ⇒ Object
Use this opts for connection:
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29 def establish_connection(config = {}) return if skip @config = config.to_h begin # Connect nats to provided configuration nats.connect(cluster_name, client_name, config) true rescue StandardError => exception log(action: :connection_failed, message: exception) false end end |
#:cluster_name - name of nats-streaming cluster that we connect to(: cluster_name-nameofnats-streamingclusterthatweconnectto) ⇒ Object
Use this opts for connection:
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29 def establish_connection(config = {}) return if skip @config = config.to_h begin # Connect nats to provided configuration nats.connect(cluster_name, client_name, config) true rescue StandardError => exception log(action: :connection_failed, message: exception) false end end |
#:disable_nats - if something is passed to that attribute - nats won't be initialized(: disable_nats-) ⇒ Object
Use this opts:
16 17 18 19 20 21 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16 def initialize(opts = {}) @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client @logger = NatsListenerCore::ClientLogger.new(opts) @skip = opts[:skip] || false @client_catcher = NatsListenerCore::ClientCatcher.new(opts) end |
#:logger - logger used in this service(: logger-loggerused) ⇒ Object
Use this opts:
16 17 18 19 20 21 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16 def initialize(opts = {}) @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client @logger = NatsListenerCore::ClientLogger.new(opts) @skip = opts[:skip] || false @client_catcher = NatsListenerCore::ClientCatcher.new(opts) end |
#:nats - nats connection info(example: ```{servers: 'nats://127.0.0.1:4223'}```)(: nats-natsconnectioninfo(example: ```{servers: 'nats://127.0.0.1:4223'}```)) ⇒ Object
Use this opts for connection:
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29 def establish_connection(config = {}) return if skip @config = config.to_h begin # Connect nats to provided configuration nats.connect(cluster_name, client_name, config) true rescue StandardError => exception log(action: :connection_failed, message: exception) false end end |
#:service_name - name of current service(: service_name-nameofcurrentservice) ⇒ Object
Use this opts for connection:
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29 def establish_connection(config = {}) return if skip @config = config.to_h begin # Connect nats to provided configuration nats.connect(cluster_name, client_name, config) true rescue StandardError => exception log(action: :connection_failed, message: exception) false end end |
#:skip - flag attribute used to skip connections(useful for testing)(: skip-flagattributeusedtoskipconnections(useful) ⇒ Object
Use this opts:
16 17 18 19 20 21 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16 def initialize(opts = {}) @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client @logger = NatsListenerCore::ClientLogger.new(opts) @skip = opts[:skip] || false @client_catcher = NatsListenerCore::ClientCatcher.new(opts) end |
Instance Method Details
#client_name ⇒ Object
43 44 45 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 43 def client_name "#{service_name}-#{config.fetch(:client_id) { :client_id }}" end |
#cluster_name ⇒ Object
47 48 49 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 47 def cluster_name config.fetch(:cluster_name) { :cluster_name } end |
#disconnected? ⇒ Boolean
58 59 60 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 58 def disconnected? nats&.nats&.status.to_i.zero? end |
#establish_connection(config = {}) ⇒ Object
Use this opts for connection:
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29 def establish_connection(config = {}) return if skip @config = config.to_h begin # Connect nats to provided configuration nats.connect(cluster_name, client_name, config) true rescue StandardError => exception log(action: :connection_failed, message: exception) false end end |
#reestablish_connection ⇒ Object
62 63 64 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 62 def reestablish_connection establish_connection(config) if disconnected? end |
#request(subject, message, opts = {}) ⇒ Object
51 52 53 54 55 56 |
# File 'lib/nats_streaming_listener/streaming_client.rb', line 51 def request(subject, , opts = {}) with_connection do log(action: :request, message: ) nats.request(subject, , opts) end end |