Class: Fluent::Plugin::NatsStreamingInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_nats-streaming.rb

Instance Method Summary collapse

Constructor Details

#initializeNatsStreamingInput

Returns a new instance of NatsStreamingInput.



33
34
35
36
# File 'lib/fluent/plugin/in_nats-streaming.rb', line 33

def initialize
  super
  @sc = nil
end

Instance Method Details

#closeObject



89
90
91
92
# File 'lib/fluent/plugin/in_nats-streaming.rb', line 89

def close
  super
  @sc.close if @sc
end

#configure(conf) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/in_nats-streaming.rb', line 38

def configure(conf)
  super

  @sc_config = {
    servers: ["nats://#{server}"],
    reconnect_time_wait: @reconnect_time_wait,
    max_reconnect_attempts: @max_reconnect_attempts
  }

  @sub_opts = {
    queue: @queue,
    durable_name: @durable_name,
    start_at: :first,
    deliver_all_available: true,
    ack_wait: 10,  # seconds
    connect_timeout: 2 # seconds
  }
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/fluent/plugin/in_nats-streaming.rb', line 29

def multi_workers_ready?
  true
end

#runObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/in_nats-streaming.rb', line 62

def run
  @sc = STAN::Client.new

  log.info "connect nats server nats://#{server} #{cluster_id} #{client_id}"
  @sc.connect(@cluster_id, @client_id.gsub(/\./, '_'), nats: @sc_config)
  log.info "connected"

  log.info "subscribe #{channel} #{queue} #{durable_name}"
  @sc.subscribe(@channel, @sub_opts) do |msg|
    tag = @channel
    begin
      message = JSON.parse(msg.data)
    rescue JSON::ParserError => e
      log.error "Failed parsing JSON #{e.inspect}.  Passing as a normal string"
      message = msg.data
    end
    time = Fluent::Engine.now
    router.emit(tag, time, message || {})
  end

  while thread_current_running?
    log.trace "test connection"
    @sc.nats.flush(@reconnect_time_wait)
    sleep(5)
  end
end

#startObject



57
58
59
60
# File 'lib/fluent/plugin/in_nats-streaming.rb', line 57

def start
  super
  thread_create(:nats_streaming_input_main, &method(:run))
end

#terminateObject



94
95
96
97
# File 'lib/fluent/plugin/in_nats-streaming.rb', line 94

def terminate
  super
  @sc = nil
end