Class: RubyNestNats::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_nest_nats/client.rb

Overview

The RubyNestNats::Client class provides a basic interface for subscribing to messages by subject & queue, and replying to those messages. It also logs most functionality if desired.

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.default_queueObject

:nodoc:



14
15
16
# File 'lib/ruby_nest_nats/client.rb', line 14

def default_queue
  @default_queue
end

.loggerObject

:nodoc:



14
15
16
# File 'lib/ruby_nest_nats/client.rb', line 14

def logger
  @logger
end

Class Method Details

.reply_to(subject, queue: nil, &block) ⇒ Object

Register a message handler with the RubyNestNats::Client::reply_to method. Pass a subject string as the first argument (either a static subject string or a pattern to match more than one subject). Specify a queue (or don’t) with the queue: option. If you don’t provide the queue: option, it will be set to the value of default_queue, or to nil (no queue) if a default queue hasn’t been set.

The result of the given block will be published in reply to the message. The block is passed two arguments when a message matching the subject is received: data and subject. The data argument is the payload of the message (JSON objects/arrays will be parsed into string-keyed Hash objects/Array objects, respectively). The subject argument is the subject of the message received (mostly only useful if a pattern was specified instead of a static subject string).

Examples:

RubyNestNats::Client.reply_to("some.subject", queue: "foobar") { |data| "Got it! #{data.inspect}" }

RubyNestNats::Client.reply_to("some.*.pattern") { |data, subject| "Got #{data} on #{subject}" }

RubyNestNats::Client.reply_to("other.subject") do |data|
  if data["foo"] == "bar"
    { is_bar: "Yep!" }
  else
    { is_bar: "No way!" }
  end
end

RubyNestNats::Client.reply_to("subject.in.queue", queue: "barbaz") do
  "My turn!"
end


101
102
103
104
105
106
# File 'lib/ruby_nest_nats/client.rb', line 101

def reply_to(subject, queue: nil, &block)
  queue = Utils.presence(queue) || default_queue
  queue_desc = " in queue '#{queue}'" if queue
  log("Registering a reply handler for subject '#{subject}'#{queue_desc}", level: :debug)
  register_reply!(subject: subject.to_s, handler: block, queue: queue.to_s)
end

.start!Object

Start listening for messages with the RubyNestNats::Client::start! method. This will spin up a non-blocking thread that subscribes to subjects (as specified by invocation(s) of ::reply_to) and waits for messages to come in. When a message is received, the appropriate ::reply_to block will be used to compute a response, and that response will be published.

NOTE: If an error is raised in one of the handlers, RubyNestNats::Client will restart automatically.

NOTE: You can invoke ::reply_to to create additional message subscriptions after RubyNestNats::Client.start!, but be aware that this forces the client to restart. You may see (benign, already-handled) errors in the logs generated when this restart happens. It will force the client to restart and re-subscribe after _each additional ::reply_to invoked after ::start!._ So, if you have a lot of additional ::reply_to invocations, you may want to consider refactoring so that your call to RubyNestNats::Client.start! occurs after those additions.

NOTE: The ::start! method can be safely called multiple times; only the first will be honored, and any subsequent calls to ::start! after the client is already started will do nothing (except write a _“NATS is already running”_ log to the logger at the DEBUG level).

Examples:

RubyNestNats::Client.start!


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/ruby_nest_nats/client.rb', line 136

def start!
  log("Starting NATS", level: :debug)

  if started?
    log("NATS is already running", level: :debug)
    return
  end

  started!

  self.current_thread = Thread.new do
    Thread.handle_interrupt(StandardError => :never) do
      Thread.handle_interrupt(StandardError => :immediate) { listen }
    rescue NATS::ConnectError => e
      log("Could not connect to NATS server:", level: :error)
      log(e.full_message, level: :error, indent: 2)
      Thread.current.exit
    rescue NewSubscriptionsError => e
      log("New subscriptions! Restarting...", level: :info)
      restart!
      raise e # TODO: there has to be a better way
    rescue StandardError => e
      log("Encountered an error:", level: :error)
      log(e.full_message, level: :error, indent: 2)
      restart!
      raise e
    end
  end
end

.started?Boolean

Returns true if ::start! has already been called (meaning the client is listening to NATS messages). Returns false if it has not yet been called, or if it has been stopped.

Returns:

  • (Boolean)


58
59
60
# File 'lib/ruby_nest_nats/client.rb', line 58

def started?
  @started ||= false
end

.stopped?Boolean

Opposite of ::started?: returns false if ::start! has already been called (meaning the client is listening to NATS messages). Returns true if it has not yet been called, or if it has been stopped.

Returns:

  • (Boolean)


65
66
67
# File 'lib/ruby_nest_nats/client.rb', line 65

def stopped?
  !started?
end