Class: Fluent::Plugin::SakuraIOOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sakuraio.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



17
18
19
20
21
22
# File 'lib/fluent/plugin/out_sakuraio.rb', line 17

def configure(conf)
  super

  ensure_reactor_running
  thread_create(:out_sakuraio, &method(:run))
end

#encode_record(mod, record) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_sakuraio.rb', line 67

def encode_record(mod, record)
  data = []
  @channels.each do |ch, v|
    key, type = v
    data.push('channel' => ch.to_i,
              'type' => type,
              'value' => record[key])
  end
  hash = { 'type' => 'channels',
           'module' => mod,
           'payload' => { 'channels' => data } }
  Yajl::Encoder.encode(hash)
end

#ensure_reactor_runningObject



24
25
26
27
28
29
30
# File 'lib/fluent/plugin/out_sakuraio.rb', line 24

def ensure_reactor_running
  return if EM.reactor_running?

  thread_create(:out_sakuraio_reactor) do
    EM.run
  end
end

#process(_tag, events) ⇒ Object



56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_sakuraio.rb', line 56

def process(_tag, events)
  events.each do |_time, record|
    log.debug "sakuraio: process record #{record}"
    modules.each do |m|
      s = encode_record(m, record)
      log.debug "sakuraio: encoded json #{s}"
      @client.send(s)
    end
  end
end

#runObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/out_sakuraio.rb', line 32

def run
  options = {}
  options[:ping] = @ping if @ping.positive?
  @client = Faye::WebSocket::Client.new(@url, nil, options)
  EM.next_tick do
    @client.on :open do
      log.info "sakuraio: starting websocket connection for #{@url}."
    end

    @client.on :message do |event|
      log.debug "sakuraio: received message #{event.data}"
    end

    @client.on :error do |event|
      log.warn "sakuraio: #{event.message}"
    end

    @client.on :close do |event|
      log.warn "sakuraio: #{event.code} #{event.reason}"
      run
    end
  end
end