Module: Protobuf::Nats

Defined in:
lib/protobuf/nats.rb,
lib/protobuf/nats/jnats.rb,
lib/protobuf/nats/client.rb,
lib/protobuf/nats/config.rb,
lib/protobuf/nats/errors.rb,
lib/protobuf/nats/runner.rb,
lib/protobuf/nats/server.rb,
lib/protobuf/nats/version.rb,
lib/protobuf/nats/thread_pool.rb

Defined Under Namespace

Modules: Errors, Messages Classes: Client, Config, JNats, Runner, Server, ThreadPool

Constant Summary collapse

NatsClient =
if defined? JRUBY_VERSION
  require "protobuf/nats/jnats"
  ::Protobuf::Nats::JNats
else
  ::NATS::IO::Client
end
GET_CONNECTED_MUTEX =
::Mutex.new
VERSION =
"0.10.8"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.client_nats_connectionObject

Returns the value of attribute client_nats_connection.



18
19
20
# File 'lib/protobuf/nats.rb', line 18

def client_nats_connection
  @client_nats_connection
end

Class Method Details

.configObject



35
36
37
38
39
40
41
# File 'lib/protobuf/nats.rb', line 35

def self.config
  @config ||= begin
    config = ::Protobuf::Nats::Config.new
    config.load_from_yml
    config
  end
end

.error_callbacksObject

We will always log an error.



47
48
49
# File 'lib/protobuf/nats.rb', line 47

def self.error_callbacks
  @error_callbacks ||= [lambda { |error| log_error(error) }]
end

.log_error(error) ⇒ Object

This will work with both ruby and java errors



118
119
120
121
122
123
124
# File 'lib/protobuf/nats.rb', line 118

def self.log_error(error)
  logger.error error.to_s
  logger.error error.class.to_s
  if error.respond_to?(:backtrace) && error.backtrace.is_a?(::Array)
    logger.error error.backtrace.join("\n")
  end
end

.loggerObject



126
127
128
# File 'lib/protobuf/nats.rb', line 126

def self.logger
  ::Protobuf::Logging.logger
end

.notify_error_callbacks(error) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/protobuf/nats.rb', line 60

def self.notify_error_callbacks(error)
  error_callbacks.each do |callback|
    begin
      callback.call(error)
    rescue => callback_error
      log_error(callback_error)
    end
  end

  nil
end

.on_error(&block) ⇒ Object



54
55
56
57
58
# File 'lib/protobuf/nats.rb', line 54

def self.on_error(&block)
  fail ::ArgumentError unless block.arity == 1
  error_callbacks << block
  nil
end

.start_client_nats_connectionObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/protobuf/nats.rb', line 80

def self.start_client_nats_connection
  return true if @client_nats_connection

  GET_CONNECTED_MUTEX.synchronize do
    break true if @client_nats_connection

    # Disable publisher pending buffer on reconnect
    options = config.connection_options.merge(:disable_reconnect_buffer => true)

    client = NatsClient.new
    client.connect(options)

    # Ensure we have a valid connection to the NATS server.
    client.flush(5)

    client.on_disconnect do
      logger.warn("Client NATS connection was disconnected")
    end

    client.on_reconnect do
      logger.warn("Client NATS connection was reconnected")
    end

    client.on_close do
      logger.warn("Client NATS connection was closed")
    end

    client.on_error do |error|
      notify_error_callbacks(error)
    end

    @client_nats_connection = client

    true
  end
end

.subscription_key(service_klass, service_method) ⇒ Object



72
73
74
75
76
77
78
# File 'lib/protobuf/nats.rb', line 72

def self.subscription_key(service_klass, service_method)
  service_class_name = service_klass.name.underscore.gsub("/", ".")
  service_method_name = service_method.to_s.underscore

  subscription_key = "rpc.#{service_class_name}.#{service_method_name}"
  subscription_key = config.make_subscription_key_replacements(subscription_key)
end