Class: NatsStreamingListener::StreamingClient

Inherits:
NatsListenerCore::AbstractClient
  • Object
show all
Defined in:
lib/nats_streaming_listener/streaming_client.rb

Overview

Implementation of Nats-streaming client

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ StreamingClient

Use this opts:



16
17
18
19
20
21
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16

def initialize(opts = {})
  @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client
  @logger =  NatsListenerCore::ClientLogger.new(opts)
  @skip = opts[:skip] || false
  @client_catcher =  NatsListenerCore::ClientCatcher.new(opts)
end

Instance Attribute Details

#:catch_errors - used to catch errors around subscribers/connections(be careful with it!)(: catch_errors-usedtocatcherrorsaroundsubscribers/connections(be careful with it!)) ⇒ Object

Use this opts:



16
17
18
19
20
21
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16

def initialize(opts = {})
  @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client
  @logger =  NatsListenerCore::ClientLogger.new(opts)
  @skip = opts[:skip] || false
  @client_catcher =  NatsListenerCore::ClientCatcher.new(opts)
end

#:catch_provider - this class will be called with catch_provider.error(e)(: catch_provider-this) ⇒ Object

Use this opts:



16
17
18
19
20
21
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16

def initialize(opts = {})
  @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client
  @logger =  NatsListenerCore::ClientLogger.new(opts)
  @skip = opts[:skip] || false
  @client_catcher =  NatsListenerCore::ClientCatcher.new(opts)
end

#:client_id - current service client id(optional)(: client_id-currentserviceclientid(optional)) ⇒ Object

Use this opts for connection:



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29

def establish_connection(config = {})
  return if skip

  @config = config.to_h
  begin
    # Connect nats to provided configuration
    nats.connect(cluster_name, client_name, config)
    true
  rescue StandardError => exception
    log(action: :connection_failed, message: exception)
    false
  end
end

#:cluster_name - name of nats-streaming cluster that we connect to(: cluster_name-nameofnats-streamingclusterthatweconnectto) ⇒ Object

Use this opts for connection:



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29

def establish_connection(config = {})
  return if skip

  @config = config.to_h
  begin
    # Connect nats to provided configuration
    nats.connect(cluster_name, client_name, config)
    true
  rescue StandardError => exception
    log(action: :connection_failed, message: exception)
    false
  end
end

#:disable_nats - if something is passed to that attribute - nats won't be initialized(: disable_nats-) ⇒ Object

Use this opts:



16
17
18
19
20
21
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16

def initialize(opts = {})
  @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client
  @logger =  NatsListenerCore::ClientLogger.new(opts)
  @skip = opts[:skip] || false
  @client_catcher =  NatsListenerCore::ClientCatcher.new(opts)
end

#:logger - logger used in this service(: logger-loggerused) ⇒ Object

Use this opts:



16
17
18
19
20
21
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16

def initialize(opts = {})
  @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client
  @logger =  NatsListenerCore::ClientLogger.new(opts)
  @skip = opts[:skip] || false
  @client_catcher =  NatsListenerCore::ClientCatcher.new(opts)
end

#:nats - nats connection info(example: ```{servers: 'nats://127.0.0.1:4223'}```)(: nats-natsconnectioninfo(example: ```{servers: 'nats://127.0.0.1:4223'}```)) ⇒ Object

Use this opts for connection:



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29

def establish_connection(config = {})
  return if skip

  @config = config.to_h
  begin
    # Connect nats to provided configuration
    nats.connect(cluster_name, client_name, config)
    true
  rescue StandardError => exception
    log(action: :connection_failed, message: exception)
    false
  end
end

#:service_name - name of current service(: service_name-nameofcurrentservice) ⇒ Object

Use this opts for connection:



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29

def establish_connection(config = {})
  return if skip

  @config = config.to_h
  begin
    # Connect nats to provided configuration
    nats.connect(cluster_name, client_name, config)
    true
  rescue StandardError => exception
    log(action: :connection_failed, message: exception)
    false
  end
end

#:skip - flag attribute used to skip connections(useful for testing)(: skip-flagattributeusedtoskipconnections(useful) ⇒ Object

Use this opts:



16
17
18
19
20
21
# File 'lib/nats_streaming_listener/streaming_client.rb', line 16

def initialize(opts = {})
  @nats = STAN::Client.new unless opts[:disable_nats] # Create nats client
  @logger =  NatsListenerCore::ClientLogger.new(opts)
  @skip = opts[:skip] || false
  @client_catcher =  NatsListenerCore::ClientCatcher.new(opts)
end

Instance Method Details

#client_nameObject



43
44
45
# File 'lib/nats_streaming_listener/streaming_client.rb', line 43

def client_name
  "#{service_name}-#{config.fetch(:client_id) { :client_id }}"
end

#cluster_nameObject



47
48
49
# File 'lib/nats_streaming_listener/streaming_client.rb', line 47

def cluster_name
  config.fetch(:cluster_name) { :cluster_name }
end

#disconnected?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/nats_streaming_listener/streaming_client.rb', line 58

def disconnected?
  nats&.nats&.status.to_i.zero?
end

#establish_connection(config = {}) ⇒ Object

Use this opts for connection:



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/nats_streaming_listener/streaming_client.rb', line 29

def establish_connection(config = {})
  return if skip

  @config = config.to_h
  begin
    # Connect nats to provided configuration
    nats.connect(cluster_name, client_name, config)
    true
  rescue StandardError => exception
    log(action: :connection_failed, message: exception)
    false
  end
end

#reestablish_connectionObject



62
63
64
# File 'lib/nats_streaming_listener/streaming_client.rb', line 62

def reestablish_connection
  establish_connection(config) if disconnected?
end

#request(subject, message, opts = {}) ⇒ Object



51
52
53
54
55
56
# File 'lib/nats_streaming_listener/streaming_client.rb', line 51

def request(subject, message, opts = {})
  with_connection do
    log(action: :request, message: message)
    nats.request(subject, message, opts)
  end
end