Class: Protobuf::Nats::JNats

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/jnats.rb

Defined Under Namespace

Classes: Message

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJNats

Returns a new instance of JNats.



23
24
25
26
27
28
29
30
31
# File 'lib/protobuf/nats/jnats.rb', line 23

def initialize
  @on_error_cb = lambda {|error|}
  @on_reconnect_cb = lambda {}
  @on_disconnect_cb = lambda {}
  @on_close_cb = lambda {}
  @options = nil
  @subz_cbs = {}
  @subz_mutex = ::Mutex.new
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



11
12
13
# File 'lib/protobuf/nats/jnats.rb', line 11

def connection
  @connection
end

#optionsObject (readonly)

Returns the value of attribute options.



11
12
13
# File 'lib/protobuf/nats/jnats.rb', line 11

def options
  @options
end

Instance Method Details

#closeObject

Do not depend on #close for a graceful disconnect.



77
78
79
80
81
82
83
84
# File 'lib/protobuf/nats/jnats.rb', line 77

def close
  @connection.close rescue nil
  @connection = nil
  @supervisor.kill rescue nil
  @supervisor = nil
  @consumer.kill rescue nil
  @supervisor = nil
end

#connect(options = {}) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/protobuf/nats/jnats.rb', line 33

def connect(options = {})
  @options ||= options

  servers = options[:servers] || ["nats://localhost:4222"]
  servers = [servers].flatten.map { |uri_string| java.net.URI.new(uri_string) }
  connection_factory = ::Java::IoNatsClient::ConnectionFactory.new
  connection_factory.setServers(servers)
  connection_factory.setMaxReconnect(options[:max_reconnect_attempts])

  # Shrink the pending buffer to always raise an error and let the caller retry.
  if options[:disable_reconnect_buffer]
    connection_factory.setReconnectBufSize(1)
  end

  # Setup callbacks
  connection_factory.setDisconnectedCallback { |event| @on_disconnect_cb.call }
  connection_factory.setReconnectedCallback { |_event| @on_reconnect_cb.call }
  connection_factory.setClosedCallback { |_event| @on_close_cb.call }
  connection_factory.setExceptionHandler { |error| @on_error_cb.call(error) }

  # Setup ssl context if we're using tls
  if options[:uses_tls]
    ssl_context = create_ssl_context(options)
    connection_factory.setSecure(true)
    connection_factory.setSSLContext(ssl_context)
  end

  @connection = connection_factory.createConnection

  # We're going to spawn a consumer and supervisor
  @work_queue = @connection.createMsgChannel
  spwan_supervisor_and_consumer

  @connection
end

#flush(timeout_sec = 0.5) ⇒ Object



86
87
88
# File 'lib/protobuf/nats/jnats.rb', line 86

def flush(timeout_sec = 0.5)
  connection.flush(timeout_sec * 1000)
end

#new_inboxObject



138
139
140
# File 'lib/protobuf/nats/jnats.rb', line 138

def new_inbox
  "_INBOX.#{::SecureRandom.hex(13)}"
end

#next_message(sub, timeout_sec) ⇒ Object



90
91
92
93
94
# File 'lib/protobuf/nats/jnats.rb', line 90

def next_message(sub, timeout_sec)
  nats_message = sub.nextMessage(timeout_sec * 1000)
  return nil unless nats_message
  Message.new(nats_message)
end

#on_close(&cb) ⇒ Object



154
155
156
# File 'lib/protobuf/nats/jnats.rb', line 154

def on_close(&cb)
  @on_close_cb = cb
end

#on_disconnect(&cb) ⇒ Object



146
147
148
# File 'lib/protobuf/nats/jnats.rb', line 146

def on_disconnect(&cb)
  @on_disconnect_cb = cb
end

#on_error(&cb) ⇒ Object



150
151
152
# File 'lib/protobuf/nats/jnats.rb', line 150

def on_error(&cb)
  @on_error_cb = cb
end

#on_reconnect(&cb) ⇒ Object



142
143
144
# File 'lib/protobuf/nats/jnats.rb', line 142

def on_reconnect(&cb)
  @on_reconnect_cb = cb
end

#publish(subject, data, mailbox = nil) ⇒ Object



96
97
98
99
# File 'lib/protobuf/nats/jnats.rb', line 96

def publish(subject, data, mailbox = nil)
  # The "true" here is to force flush. May not need this.
  connection.publish(subject, mailbox, data.to_java_bytes, true)
end

#subscribe(subject, options = {}, &block) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/protobuf/nats/jnats.rb', line 101

def subscribe(subject, options = {}, &block)
  queue = options[:queue]
  max = options[:max]
  work_queue = nil
  # We pass our work queue for processing async work because java nats
  # uses a cahced thread pool: 1 thread per async subscription.
  # Sync subs need their own queue so work is not processed async.
  work_queue = block.nil? ? connection.createMsgChannel : @work_queue
  sub = connection.subscribe(subject, queue, nil, work_queue)

  # Register the block callback. We only lock to save the callback.
  if block
    @subz_mutex.synchronize do
      @subz_cbs[sub.getSid] = block
    end
  end

  # Auto unsub if max message option was provided.
  sub.autoUnsubscribe(max) if max

  sub
end

#unsubscribe(sub) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/protobuf/nats/jnats.rb', line 124

def unsubscribe(sub)
  return if sub.nil?

  # Cleanup our async callback
  if @subz_cbs[sub.getSid]
    @subz_mutex.synchronize do
      @subz_cbs.delete(sub.getSid)
    end
  end

  # The "true" here is to ignore and invalid conn.
  sub.unsubscribe(true)
end